Job Module

Basic Job Submission

Describe example

Download Python source for this example.

 1#!/usr/bin/env python
 2
 3__author__    = "Andre Merzky, Ole Weidner"
 4__copyright__ = "Copyright 2012-2013, The SAGA Project"
 5__license__   = "MIT"
 6
 7
 8"""
 9This examples shows how to run a job on the local machine
10using the 'local' job adaptor.
11"""
12
13import os
14import sys
15import radical.saga as saga
16
17
18def main():
19
20    try:
21        # Create a job service object that represent the local machine.
22        # The keyword 'fork' in the url scheme triggers the 'shell' adaptor.
23        # The adaptor also support ssh:// and gsissh://
24        js = saga.job.Service("fork://localhost")
25
26        # Next, we describe the job we want to run. A complete set of job
27        # description attributes can be found in the API documentation.
28        for i in range (100) :
29            jd = saga.job.Description()
30            jd.environment       = {'FILENAME': 'testfile'}
31
32            jd.executable        = '/usr/bin/touch'
33            jd.arguments         = ['$FILENAME']
34
35            jd.working_directory = "/tmp/%d/A/B/C" % os.getuid()
36            jd.output            = "examplejob.out"
37            jd.error             = "examplejob.err"
38
39            # Create a new job from the job description. The initial state of
40            # the job is 'New'.
41            touchjob = js.create_job(jd)
42
43            # Check our job's id and state
44            print("Job ID    : %s" % (touchjob.id))
45            print("Job State : %s" % (touchjob.state))
46
47            # Now we can start our job.
48            print("\n...starting job...\n")
49            touchjob.run()
50
51            print("Job ID    : %s" % (touchjob.id))
52            print("Job State : %s" % (touchjob.state))
53
54            # List all jobs that are known by the adaptor.
55            # This should show our job as well.
56            print("\nListing active jobs: ")
57            for job in js.list():
58                print(" * %s" % job)
59
60            # Now we disconnect and reconnect to our job by using the get_job()
61            # method and our job's id. While this doesn't make a lot of sense
62            # here,  disconnect / reconnect can become very important for
63            # long-running job.
64            touchjob_clone = js.get_job(touchjob.id)
65
66            # wait for our job to complete
67            print("\n...waiting for job...\n")
68            touchjob_clone.wait()
69
70            print("Job State   : %s" % (touchjob_clone.state))
71            print("Exitcode    : %s" % (touchjob_clone.exit_code))
72            print("Exec. hosts : %s" % (touchjob_clone.execution_hosts))
73            print("Create time : %s" % (touchjob_clone.created))
74            print("Start time  : %s" % (touchjob_clone.started))
75            print("End time    : %s" % (touchjob_clone.finished))
76
77        js.close()
78        return 0
79
80    except saga.SagaException as ex:
81        # Catch all saga exceptions
82        print("An exception occured: %s " % ex)
83        # Trace back the exception. That can be helpful for debugging.
84        print(" \n*** Backtrace:\n %s" % ex.traceback)
85        return -1
86
87
88if __name__ == "__main__":
89    sys.exit (main())
90

Job Containers

Describe example

Download Python source for this example.

 1#!/usr/bin/env python
 2
 3__author__    = "Andre Merzky, Ole Weidner"
 4__copyright__ = "Copyright 2012-2013, The SAGA Project"
 5__license__   = "MIT"
 6
 7
 8""" This examples shows how to run groups of jobs using the
 9    'local' file adaptor. This example uses job containers for
10    simplified and optimized bulk job handling.
11
12    Job container can be used to easily model dependencies between
13    groups of different jobs, e.g., in workflow scenarios. In this example,
14    we execute 'num_job_groups' containers of jobs_per_group' number of
15    parallel jobs sequentially::
16
17      C1[j1,j2,j3,j4,...] -> C2[j1,j2,j3,j4,...] -> C3[j1,j2,j3,j4,...] -> ...
18
19    Depending on the adaptor implementation, using job containers can be
20    quite advantageous in terms of call latency. Some adaptors implement
21    special bulk operations for container management, which makes them
22    generally much faster than iterating over and operating on individual jobs.
23"""
24
25import sys
26import random
27import time
28
29import radical.saga as rs
30
31
32URL = "condor+gsissh://xd-login.opensciencegrid.org"
33URL = "fork://locahost/"
34
35def main():
36
37    # number of job 'groups' / containers
38    num_job_groups = 2
39    # number of jobs per container
40    jobs_per_group = 10
41
42    try:
43        # all jobs in this example are running on the same job service
44        # this is not a requirement though. s
45        service = rs.job.Service(URL)
46        print(service.url)
47
48        t1 = time.time()
49        # create and populate our containers
50        containers = list()
51        for c in range(0, num_job_groups):
52            # create containers
53            containers.append(rs.job.Container())
54            for j in range(0, jobs_per_group):
55                # add jobs to container. to make things a bit more
56                # interesting, we give each job a random runtime (1-60s)
57                jd = rs.job.Description()
58                jd.environment = {'RUNTIME': random.randrange(10, 60)}
59                jd.executable  = '/bin/sleep'
60                jd.arguments   = ['$RUNTIME']
61                jd.name        = ['job.%02d.%03d' % (c, j)]
62                j = service.create_job(jd)
63                containers[c].add(j)
64
65        # execute the containers sequentially
66        for c in range(0, num_job_groups):
67            print('Starting container %s ... ' % c)
68            containers[c].run()
69
70            for j in containers[c].get_tasks():
71                print('%s: %s: %s' % (j.name, j.id, j.state))
72
73            print(containers[c].get_states ())
74            containers[c].cancel()
75            containers[c].wait()
76            print(containers[c].get_states ())
77
78            # # at this point, all jobs in the container
79            # # have finished running. we can now print some statistics
80            # for job in containers[c].jobs:
81            #     print "  * Job id=%s state=%s rc=%s exec_host=%s start_time=%s end_time=%s" \
82            #       % (job.id, job.state, job.exit_code, job.execution_hosts,
83            #          job.started, job.finished)
84        t2 = time.time()
85        print(t2-t1)
86
87        service.close()
88        return 0
89
90    except rs.SagaException as ex:
91        print("An exception occured: %s " % ((str(ex))))
92        # get the whole traceback in case of an exception -
93        # this can be helpful for debugging the problem
94        print(" *** %s" % ex.traceback)
95        return -1
96
97if __name__ == "__main__":
98    sys.exit(main())