from .output_file import OutputFile
import os
[docs]class Job:
def __init__(self, service, name, job_id=None, inputs=None,
workflow=None, input_desc=None, outputs=None):
"""
Create a new Job object.
This will not actually make a job on the service. Use
Service.create_job() to make a job.
Args:
service (Service): The service this job will run on.
name (str): The name for the job.
"""
self.id = job_id
"""The service-assigned id of this job."""
self.name = name
"""The name of this job."""
self._service = service
"""The service that this job runs on."""
self._inputs = inputs
"""List of declared inputs of the workflow."""
if self._inputs is None: self._inputs = []
self._workflow_url = workflow
"""The remote path to the uploaded workflow file."""
self._input_desc = input_desc
"""The input description object to be submitted."""
if self._input_desc is None: self._input_desc = {}
self._outputs = outputs
"""Cached results."""
if self._outputs is None: self._outputs = {}
@property
def state(self):
"""
The state that this job is in.
None if the job has not been started yet. One of 'Waiting',
'Running', 'Success', 'Cancelled', 'TemporaryFailure',
'PermanentFailure', or 'SystemError'.
"""
if self.id is None:
return None
return self._service._job_state(self.id)
@property
def log(self):
"""
The job's log, as produced by the service.
"""
if self.id is not None:
return self._service._get_log(self.id)
return None
[docs] def set_workflow(self, file_path):
"""
Sets the workflow file.
Only one workflow file may be submitted; if this function is
called repeatedly, the last workflow file is used.
Args:
file_path (str): The path to the CWL file defining the
workflow to be run. The file's name must not equal
that of any file added using add_input_file().
Raises:
FileNotFound: The workflow file was not found at the given
path.
"""
# scan workflow for inputs and store them in the object
# list of (name, type) tuples? or {name: type}?
# dict is better, need to search by name when adding
# upload the workflow
self._workflow_url = self._service._upload_file(self.name, file_path)
# maybe make subdirs for input files and workflow, to allow
# same names?
[docs] def run(self):
"""
Starts the job running on the associated service.
Returns:
str: The id given to this job by the service.
"""
job_desc = {
'name': self.name,
'workflow': self._workflow_url,
'input': self._input_desc
}
self.id = self._service._run_job(job_desc)
return self.id
[docs] def is_running(self):
"""
Returns True when the job is still running, False when it is
done.
Returns:
boolean: Whether the job is running.
"""
state = self.state # fetch only once
return state == 'Waiting' or state == 'Running'
[docs] def cancel(self):
"""
Cancel this job; stop it running at the compute service.
After this function is called, the job's state will eventually
become 'Cancelled', unless it was already complete, in which
case its state will be its normal final state.
"""
if self.id is not None:
self._service._cancel_job(self.id)
@property
def outputs(self):
"""
Returns a dictionary of output objects. Keys are taken from
the names of the outputs in the submitted workflows, the
values are the corresponding results.
If an output is of type File, an object of class
OutputFile is returned as the value.
If no outputs are available, returns None.
Returns:
Union[dict, None]: Output values or None.
"""
if self._outputs == {}:
outputs = self._service._get_outputs(self.id)
for key, value in outputs.items():
if isinstance(value, dict):
if value.get('class', '') == 'File':
self._outputs[key] = OutputFile(value['location'])
else:
self._outputs[key] = value
return self._outputs
[docs] def delete(self):
"""
Deletes the job and all its input and output data from the
service.
"""
self._service._delete_job(self.name, self.id)
self.id = None