3.2. Job Submission and Control

SAGA’s job management module is central to the API. It represents an application/executable running under the management of a resource manager. A resource manager can be anything from the local machine to a remote HPC queueing system to grid and cloud computing services.

The basic usage of the job module is as follows:

# A job.Description object describes the executable/application and its requirements
job_desc = radical.saga.job.Description()
job_desc.executable  = '/bin/sleep'
job_desc.arguments   = ['10']
job_desc.output      = 'myjob.out'
job_desc.error       = 'myjob.err'

# A job.Service object represents the resource manager. In this example we use the 'local' adaptor to represent the local machine
service = radical.saga.job.Service('local://localhost')

# A job is created on a service (resource manager) using the job description
job = service.create_job(job_desc)

# Run the job and wait for it to finish
job.run()
print "Job ID    : %s" % (job.job_id)
job.wait()

# Get some info about the job
print "Job State : %s" % (job.state)
print "Exitcode  : %s" % (job.exit_code)

service.close()

See also

More examples can be found in the individual adaptor sections!

Like all SAGA modules, the job module relies on middleware adaptors to provide bindings to a specific resource manager. Adaptors are implicitly selected via the scheme part of the URL, e.g., local:// in the example above selects the local job adaptor. The Job Service – radical.saga.job.Service section explains this in more detail.

Note

A list of available adaptors and supported resource managers can be found in the Developer Documentation part of this documentation.

The rest of this section is structured as follows:

3.2.1. Job Service – radical.saga.job.Service

class radical.saga.job.Service(rm=None, session=None, _adaptor=None, _adaptor_state={}, _ttype=None)[source]

The job.Service represents a resource management backend, and as such allows the creation, submission and management of jobs.

A job.Service represents anything which accepts job creation requests, and which manages thus created saga.job.Job instances. That can be a local shell, a remote ssh shell, a cluster queuing system, a IaaS backend – you name it.

The job.Service is identified by an URL, which usually points to the contact endpoint for that service.

Example:

service  = saga.job.Service("fork://localhost")
ids = service.list()

for job_id in ids :
    print(job_id)

    j = service.get_job(job_id)

    if j.get_state() == saga.job.Job.Pending:
        print("pending")
    elif j.get_state() == saga.job.Job.Running:
        print("running")
    else:
        print("job is already final!")

service.close()
__init__(rm, session)[source]

Create a new job.Service instance.

Parameters:
  • rm (string or saga.Url) – resource manager URL

  • session (saga.Session) – an optional session object with security contexts

Return type:

saga.job.Service

close()[source]

Close the job service instance and disconnect from the (remote) job service if necessary. Any subsequent calls to a job service instance after close() was called will fail.

Example:

service = saga.job.Service("fork://localhost")

# do something with the 'service' object, create jobs, etc...

service.close()

service.list() # this call will throw an exception

Warning

While in principle the job service destructor calls close() automatically when a job service instance goes out of scope, you shouldn’t rely on it. Python’s garbage collection can be a bit odd at times, so you should always call close() explicitly. Especially in a multi-threaded program this will help to avoid random errors.

create_job(job_desc)[source]

Create a new job.Job instance from a Description. The resulting job instance is in NEW state.

Parameters:
  • job_desc (saga.job.Description) – job description to create the job from

  • ttype|param_ttype|

Return type:

saga.job.Job or |rtype_ttype|

create_job() accepts a job description, which described the application instance to be created by the backend. The create_job() method is not actually attempting to run the job, but merely parses the job description for syntactic and semantic consistency. The job returned object is thus not in ‘Pending’ or ‘Running’, but rather in ‘New’ state. The actual submission is performed by calling run() on the job object.

Example:

# A job.Description object describes the executable/application
# and its requirements
job_desc = saga.job.Description()
job_desc.executable  = '/bin/sleep'
job_desc.arguments   = ['10']
job_desc.output      = 'myjob.out'
job_desc.error       = 'myjob.err'

service = saga.job.Service('local://localhost')

job = service.create_job(job_desc)

# Run the job and wait for it to finish
job.run()
print("Job ID    : %s" % (job.job_id))
job.wait()

# Get some info about the job
print("Job State : %s" % (job.state))
print("Exitcode  : %s" % (job.exit_code))

service.close()
get_job(job_id)[source]

Return the job object for a given job id.

Parameters:

job_id – The id of the job to retrieve

Return type:

saga.job.Job

Job objects are a local representation of a remote stateful entity. The job.Service supports to reconnect to those remote entities:

service = saga.job.Service("fork://localhost")
j  = service.get_job(my_job_id)

if j.get_state() == saga.job.Job.Pending:
    print("pending")
elif j.get_state() == saga.job.Job.Running:
    print("running")
else:
    print("job is already final!")

service.close()
get_url()[source]

Return the URL this Service instance was created with.

See also

The url property and the get_url() method are semantically equivalent and only duplicated for convenience.

property jobs

list()

Return a list of the jobs that are managed by this Service instance.

See also

The jobs property and the list() method are semantically equivalent.

Ttype:

|param_ttype|

Return type:

list of saga.job.Job

As the job.Service represents a job management backend, list() will return a list of job IDs for all jobs which are known to the backend, and which can potentially be accessed and managed by the application.

Example:

service  = saga.job.Service("fork://localhost")
ids = service.list()

for job_id in ids :
    print(job_id)

service.close()
list()[source]

Return a list of the jobs that are managed by this Service instance.

See also

The jobs property and the list() method are semantically equivalent.

Ttype:

|param_ttype|

Return type:

list of saga.job.Job

As the job.Service represents a job management backend, list() will return a list of job IDs for all jobs which are known to the backend, and which can potentially be accessed and managed by the application.

Example:

service  = saga.job.Service("fork://localhost")
ids = service.list()

for job_id in ids :
    print(job_id)

service.close()
run_job(cmd, host=None)[source]
property url

get_url()

Return the URL this Service instance was created with.

See also

The url property and the get_url() method are semantically equivalent and only duplicated for convenience.

3.2.2. Job Description – radical.saga.job.Description

Warning: There is no guarantee that all middleware adaptors implement all job description attributes. In case a specific attribute is not supported, the create_job() will throw an exception. Please refer to the Developer Documentation documentation for more details and adaptor-specific lists of supported attributes.

class radical.saga.job.Description[source]

Bases: Attributes

The job description class.

clone()[source]

Implements deep copy. u

Unlike the default python assignment (copy object reference), a deep copy will create a new object instance with the same state – after a deep copy, a change on one instance will not affect the other.

SAGA defines the following constants as valid job description attributes:

radical.saga.job.EXECUTABLE

The executable to start once the job starts running:

jd = radical.saga.job.Description()
jd.executable = "/bin/sleep"
Type:

str

radical.saga.job.executable

Same as attribute EXECUTABLE.

radical.saga.job.ARGUMENTS

Arguments to pass to the EXECUTABLE:

jd = radical.saga.job.Description()
jd.arguments = ['--flag1', '--flag2']
Tpye:

list()

radical.saga.job.arguments

Same as attribute ARGUMENTS.

radical.saga.job.ENVIRONMENT

Environment variables to set in the job’s context:

jd = radical.saga.job.Description()
jd.environemnt = {'FOO': 'BAR', 'FREE': 'BEER'}
Type:

dict()

radical.saga.job.environment

Same as attribute ENVIRONMENT.

radical.saga.job.WORKING_DIRECTORY

The working directory of the job:

jd = radical.saga.job.Description()
jd.working_directory = "/scratch/experiments/123/"
Type:

str()

radical.saga.job.working_directory

Same as attribute WORKING_DIRECTORY.

radical.saga.job.OUTPUT

Filename to capture the executable’s STDOUT stream. If output is a relative filename, the file is relative to WORKING_DIRECTORY:

jd = radical.saga.job.Description()
jd.output = "myjob_stdout.txt"
Type:

str()

radical.saga.job.output

Same as attribute OUTPUT.

radical.saga.job.ERROR

Filename to capture the executable’s STDERR stream. If error is a relative filename, the file is relative to WORKING_DIRECTORY:

jd = radical.saga.job.Description()
jd.error = "myjob_stderr.txt"
Type:

str()

radical.saga.job.error

Same as attribute ERROR.

radical.saga.job.FILE_TRANSFER

Files to stage-in before the job starts running and to stage out once the job has finished running. The syntax is as follows:

local_file OPERATOR remote_file

OPERATOR can be one of the following:

  • > copies the local file to the remote fille before the job starts. Overwrites the remote file if it exists.

  • < copies the remote file to the local file after the job finishes. Overwrites the local file if it exists

Example:

jd = radical.saga.job.Description()
jd.input_file_transfer = ["file://localhost/data/input/test.dat > "test.dat",
                          "file://localhost/data/results/1/result.dat < "result1.dat"
                         ]
Type:

list()

radical.saga.job.file_transfer

Same as attribute FILE_TRANSFER.

radical.saga.job.QUEUE

The name of the queue to submit the job to:

jd = radical.saga.job.Description()
jd.queue = "mpi_long"
Type:

str()

radical.saga.job.queue

Same as attribute QUEUE.

radical.saga.job.PROJECT

The name of the project / allocation to charged for the job

jd = radical.saga.job.Description()
jd.project = "TG-XYZ123456"
Type:

str()

radical.saga.job.project

Same as attribute PROJECT.

radical.saga.job.SPMD_VARIATION

Describe me!

Type:

str()

radical.saga.job.spmd_variation

(Property) Same as attribute SPMD_VARIATION.

radical.saga.job.TOTAL_CPU_COUNT = 'TotalCPUCount'

str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.

radical.saga.job.total_cpu_count

(Property) Same as attribute TOTAL_CPU_COUNT.

Type:

int() or str()

radical.saga.job.TOTAL_GPU_COUNT = 'TotalGPUCount'

str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.

radical.saga.job.total_gpu_count

(Property) Same as attribute TOTAL_GPU_COUNT.

Type:

int() or str()

radical.saga.job.NUMBER_OF_PROCESSES = 'NumberOfProcesses'

str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.

radical.saga.job.number_of_processes

(Property) Same as attribute NUMBER_OF_PROCESSES.

Type:

int() or str()

radical.saga.job.PROCESSES_PER_HOST = 'ProcessesPerHost'

str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.

radical.saga.job.processes_per_host

(Property) Same as attribute PROCESSES_PER_HOST.

Type:

int() or str()

radical.saga.job.THREADS_PER_PROCESS = 'ThreadsPerProcess'

str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.

radical.saga.job.threads_per_process

(Property) Same as attribute THREADS_PER_PROCESS.

Type:

int() or str()

# NOT IMPLEMENTED.. autodata:: INTERACTIVE

radical.saga.job.CLEANUP = 'Cleanup'

str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.

radical.saga.job.cleanup

(Property) Same as attribute CLEANUP.

Type:

bool()

radical.saga.job.JOB_START_TIME = 'JobStartTime'

str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.

radical.saga.job.job_start_time

(Property) Same as attribute JOB_START_TIME.

Type:

UNIX timestamp

radical.saga.job.WALL_TIME_LIMIT = 'WallTimeLimit'

str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.

radical.saga.job.wall_time_limit

(Property) Same as attribute WALL_TIME_LIMIT.

radical.saga.job.TOTAL_PHYSICAL_MEMORY = 'TotalPhysicalMemory'

str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.

radical.saga.job.total_physical_memory

(Property) Same as attribute TOTAL_PHYSICAL_MEMORY.

radical.saga.job.SYSTEM_ARCHITECTURE = 'SystemArchitecture'

str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.

radical.saga.job.system_architecture

(Property) Same as attribute SYSTEM_ARCHITECTURE.

Type:

dict()

radical.saga.job.OPERATING_SYSTEM_TYPE = 'OperatingSystemType'

str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.

radical.saga.job.operating_system_type

(Property) Same as attribute OPERATIN_SYSTEM_TYPE.

radical.saga.job.CANDIDATE_HOSTS = 'CandidateHosts'

str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.

radical.saga.job.candidate_hosts

(Property) Same as attribute CANDIDATE_HOSTS.

radical.saga.job.JOB_CONTACT = 'JobContact'

str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.

radical.saga.job.job_contact

(Property) Same as attribute JOB_CONTACT.

3.2.3. Jobs – radical.saga.job.Job

class radical.saga.job.Job(_method_type='run', _adaptor=None, _adaptor_state={}, _ttype=None)[source]

Bases: Base, Task, Async

Represents a SAGA job as defined in GFD.90

A ‘Job’ represents a running application instance, which may consist of one or more processes. Jobs are created by submitting a Job description to a Job submission system – usually a queuing system, or some other service which spawns jobs on the user’s behalf.

Jobs have a unique ID (see get_job_id()), and are stateful entities – their ‘state’ attribute changes according to a well defined state model:

A job as returned by job.Service.create(jd) is in ‘New’ state – it is not yet submitted to the job submission backend. Once it was submitted, via run(), it will enter the ‘Pending’ state, where it waits to get actually executed by the backend (e.g. waiting in a queue etc). Once the job is actually executed, it enters the ‘Running’ state – only in that state is the job actually consuming resources (CPU, memory, …).

Jobs can leave the ‘Running’ state in three different ways: they finish successfully on their own (‘Done’), they finish unsuccessfully on their own, or get canceled by the job management backend (‘Failed’), or they get actively canceled by the user or the application (‘Canceled’).

The methods defined on the Job object serve two purposes: inspecting the job’s state, and initiating job state transitions.

property ExitCode

The job’s exitcode.

this attribute is only meaningful if the job is in ‘Done’ or ‘Final’ state - for all other job states, this attribute value is undefined.

Example:

js = saga.job.Service("fork://localhost")
jd = saga.job.Description ()
jd.executable = '/bin/date'
j  = js.create_job(jd)

j.run()
j.wait()

if j.get_state() == saga.job.FAILED :
  if j.exitcode == "42" :
      print("Ah, galaxy bypass error!")
  else :
      print("oops!")
property JobID

The job’s identifier.

This attribute is equivalent to the value returned by job.get_job_id()

property ServiceURL

The URL of the saga.job.Service instance managing this job.

This attribute is represents the URL under where the job management service can be contacted which owns the job. The value is equivalent to the service part of the job_id.

Example:

js = saga.job.Service("fork://localhost")
jd = saga.job.Description ()
jd.executable = '/bin/date'
j  = js.create_job(jd)

if j.serviceurl == "fork://localhost" :
    print("yes!")
else :
    print("oops!")
cancel(timeout)[source]

Cancel the execution of the job.

Parameters:

timeout (float) – cancel will return after timeout

Example:

js = saga.job.Service("fork://localhost")
jd = saga.job.Description ()
jd.executable = '/bin/date'
j  = js.create_job(jd)

if   j.get_state() == saga.job.NEW :
    print("new")
else :
    print("oops!")

j.run()

if   j.get_state() == saga.job.PENDING :
    print("pending")
elif j.get_state() == saga.job.RUNNING :
    print("running")
else :
    print("oops!")

j.cancel()

if   j.get_state() == saga.job.CANCELED :
    print("canceled")
else :
    print("oops!")
property description

get_description()

Return the job description this job was created from.

The returned description can be used to inspect job properties (executable name, arguments, etc.). It can also be used to start identical job instances.

The returned job description will in general reflect the actual state of the running job, and is not necessarily a simple copy of the job description which was used to create the job instance. For example, the environment variables in the returned job description may reflect the actual environment of the running job instance.

Example:

service = saga.job.Service("fork://localhost")
jd = saga.job.Description ()
jd.executable = '/bin/date'

j1 = service.create_job(jd)
j1.run()

j2 = service.create_job(j1.get_description())
j2.run()

service.close()
get_description()[source]

Return the job description this job was created from.

The returned description can be used to inspect job properties (executable name, arguments, etc.). It can also be used to start identical job instances.

The returned job description will in general reflect the actual state of the running job, and is not necessarily a simple copy of the job description which was used to create the job instance. For example, the environment variables in the returned job description may reflect the actual environment of the running job instance.

Example:

service = saga.job.Service("fork://localhost")
jd = saga.job.Description ()
jd.executable = '/bin/date'

j1 = service.create_job(jd)
j1.run()

j2 = service.create_job(j1.get_description())
j2.run()

service.close()
get_id()[source]

Return the job ID.

get_log(ttype=None)[source]

get_log_string()

Return the job’s log information, ie. backend specific log messages which have been collected during the job execution. Those messages also include stdout/stderr from the job’s pre- and post-exec. The returned string generally contains one log message per line, but the format of the string is ultimately undefined.

ttype: saga.task.type enum ret: string / saga.Task

get_log_string()[source]

Return the job’s log information, ie. backend specific log messages which have been collected during the job execution. Those messages also include stdout/stderr from the job’s pre- and post-exec. The returned string generally contains one log message per line, but the format of the string is ultimately undefined.

ttype: saga.task.type enum ret: string / saga.Task

THIS METHOD IS DEPRECATED AND WILL BE REMOVED IN A FUTURE RELEASE. USE job.get_log() INSTEAD.

get_name()[source]

Return the job name.

get_state()[source]

Return the current state of the job.

Example:

js = saga.job.Service("fork://localhost")
jd = saga.job.Description ()
jd.executable = '/bin/date'
j  = js.create_job(jd)

if   j.get_state() == saga.job.NEW :
    print("new")
else :
    print("oops!")

j.run()

if   j.get_state() == saga.job.PENDING :
    print("pending")
elif j.get_state() == saga.job.RUNNING :
    print("running")
else :
    print("oops!")
get_stderr_string()[source]

Return the job’s STDERR.

ttype: saga.task.type enum ret: string / saga.Task

THIS METHOD IS DEPRECATED AND WILL BE REMOVED IN A FUTURE RELEASE. USE job.get_stderr() INSTEAD.

get_stdout_string()[source]

Return the job’s STDOUT as string.

ttype: saga.task.type enum ret: string / saga.Task

THIS METHOD IS DEPRECATED AND WILL BE REMOVED IN A FUTURE RELEASE. USE job.get_stdout() INSTEAD.

property id

get_id()

Return the job ID.

property log

get_log_string()

Return the job’s log information, ie. backend specific log messages which have been collected during the job execution. Those messages also include stdout/stderr from the job’s pre- and post-exec. The returned string generally contains one log message per line, but the format of the string is ultimately undefined.

ttype: saga.task.type enum ret: string / saga.Task

property name

get_name()

Return the job name.

run()[source]

Run (start) the job.

Request that the job is being executed by the backend. If the backend is accepting this run request, the job will move to the ‘Pending’ or ‘Running’ state – otherwise this method will raise an error, and the job will be moved to ‘Failed’.

Example:

js = saga.job.Service("fork://localhost")
jd = saga.job.Description ()
jd.executable = '/bin/date'
j  = js.create_job(jd)

if j.get_state() == saga.job.NEW :
    print("new")
else :
    print("oops!")

j.run()

if   j.get_state() == saga.job.PENDING :
    print("pending")
elif j.get_state() == saga.job.RUNNING :
    print("running")
else :
    print("oops!")
signal(signum)[source]

Send a signal to the job.

Parameters:

signum (int) – signal to send

property state

get_state()

Return the current state of the job.

Example:

js = saga.job.Service("fork://localhost")
jd = saga.job.Description ()
jd.executable = '/bin/date'
j  = js.create_job(jd)

if   j.get_state() == saga.job.NEW :
    print("new")
else :
    print("oops!")

j.run()

if   j.get_state() == saga.job.PENDING :
    print("pending")
elif j.get_state() == saga.job.RUNNING :
    print("running")
else :
    print("oops!")
wait(timeout)[source]
Parameters:

timeout (float) – wait will return after timeout

Wait for a running job to finish execution.

The optional timeout parameter specifies the time to wait, and accepts the following values:

timeout <  0  : wait forever (block) -- same for 'None'
timeout == 0  : wait not at all (non-blocking test)
timeout >  0  : wait for 'timeout' seconds

On a non-negative timeout, the call can thus return even if the job is not in final state, and the application should check the actual job state. The default timeout value is ‘None’ (blocking).

Example:

js = saga.job.Service("fork://localhost")
jd = saga.job.Description ()
jd.executable = '/bin/date'
j  = js.create_job(jd)

if   j.get_state() == saga.job.NEW :
    print("new")
else :
    print("oops!")

j.run()

if   j.get_state() == saga.job.PENDING :
    print("pending")
elif j.get_state() == saga.job.RUNNING :
    print("running")
else :
    print("oops!")

j.wait(-1.0)

if   j.get_state() == saga.job.DONE :
    print("done")
elif j.get_state() == saga.job.FAILED :
    print("failed")
else :
    print("oops!")

3.2.3.1. Attributes

radical.saga.job.ID = 'ID'

str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.

radical.saga.job.EXECUTION_HOSTS = 'ExecutionHosts'

str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.

radical.saga.job.CREATED = 'Created'

str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.

radical.saga.job.STARTED = 'Started'

str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.

radical.saga.job.FINISHED = 'Finished'

str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.

radical.saga.job.EXIT_CODE = 'ExitCode'

str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.

3.2.3.2. States

The job state constants defined describe the possible states a job can be in. The constants can be used to check / compare the state of a job. For example:

if job.state == radical.saga.job.Pending:
    # do_something
elif job.state == radical.saga.job.Running:
    # do_something else

The constants also define the string representation of a state:

>>> str(j.state)
'Running'

SAGA defines the following constants as job states:

radical.saga.job.UNKNOWN = 'Unknown'

str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.

radical.saga.job.NEW = 'New'

str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.

radical.saga.job.PENDING = 'Pending'

str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.

radical.saga.job.RUNNING = 'Running'

str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.

radical.saga.job.DONE = 'Done'

str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.

radical.saga.job.CANCELED = 'Canceled'

str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.

radical.saga.job.FAILED = 'Failed'

str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.

radical.saga.job.SUSPENDED = 'Suspended'

str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.

3.2.3.3. Metrics

Job metrics provide a way to attach callback functions to a job object. As long as a callback remains registered, it will get triggered whenever a job metric changes.

Callback functions require three parameters:

source:

the watched object instance

metric:

the watched metric (e.g. STATE or STATE_DETAIL)

value:

the new value of the watched metric

Their return value determines if they remain registered (when returning True), or not (when returning False).

Callback functions are attached to a job object via the add_callback() method. For example:

# create a callback function
def state_cb (self, source, metric, value) :
  print "Job %s state changed to %s : %s"  % (source, value)

def main () :
  # register the callback function with the 'State' metric
  job.add_callback (radical.saga.job.STATE, state_cb)
  job.add_callback (radical.saga.job.STATE, state_cb)

Warning: There is no guarantee that all middleware adaptors implement these metrics. In case they are not implemented, you can still subscribe to them, but you won’t receive any callbacks. Please refer to the Developer Documentation documentation for more details and adaptor-specific lists of supported metrics.

RADICAL SAGA defines the following constants as job metrics:

radical.saga.job.STATE = 'State'

str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.

radical.saga.job.STATE_DETAIL = 'StateDetail'

str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.

3.2.4. Job Containers – radical.saga.job.Container

See also

More examples on how to use job containers can be found in the Job Module section of the chapter_code_examples chapter.

class radical.saga.job.Container[source]

Bases: Container

Todo:

document me

jobs

The (read-only) jobs property returns a list of all job objects in the container.

Return type:

saga.job.Job list

size

The (read-only) size property returns the number of job objectis in the container.

Return type:

int

states

The (read-only) states property returns a list of states that represent the states of the individual jobs in the container.

Return type:

list

get_jobs()[source]

This is similar to get_tasks(), but returns only Job typed entries from the container.