RADICAL-SAGA 1.60.0 Documentation¶
RADICAL-SAGA is a light-weight Python package that implements OGF SAGA interface specification and provides adaptors for different distributed middleware systems and services. RADICAL-SAGA focuses on usability, extensibility and simple deployment in real-world heterogeneous distributed computing environments and application scenarios.
Get involved or contact us:
RADICAL-SAGA on GitHub: |
||
RADICAL-SAGA Mailing List: |
Contents:¶
Installation and Usage¶
This part of the documentation is devoted to general information on the setup and configuration of SAGA and things that make working with SAGA easier.
Installation¶
Requirements¶
radical.saga has the following requirements:
Python 3.6 or newer
Installation via PyPi¶
radical.saga is available for download via PyPI and may be installed using pip. This automatically downloads and installs all dependencies required by radical.saga if they can’t be found on your system:
pip install radical.saga
Using Virtualenv¶
If you don’t want to (or can’t) install RADICAL-SAGA into your system’s Python environment, there’s a simple (and often preferred) way to create an alternative Python environment (e.g., in your home directory):
virtualenv --no-site-package $HOME/sagaenv/
. $HOME/sagaenv/bin/activate
pip install radical.saga
What if my system Doesn’t come With virtualenv, pip?
There’s a simple workaround for that using the ‘instant’ version of virtualenv. It also installs pip:
wget https://raw.githubusercontent.com/pypa/virtualenv/1.9.1/virtualenv.py
python virtualenv.py $HOME/sagaenv/ --no-site-packages
. $HOME/sagaenv/bin/activate
pip install radical.saga
Using Conda¶
Similar to Virtualenv, another method you can employ to create a Python environment is by using conda, the package manager associated with Anaconda. Before moving further, make sure you have Anaconda already installed. For more info about that, check out this link
conda create --name env_name python=3.7
conda activate env_name
pip install radical.saga
Installing the Latest Development Version¶
Warning
Please keep in mind that the latest development version of RADICAL-SAGA can be highly unstable or even completely broken. It’s not recommended to use it in a production environment.
You can install the latest development version of RADICAL-SAGA directly from our Git repository using pip:
pip install -e git://github.com/radical-cybertools/radical.saga.git@devel#egg=radical.saga
Configuration¶
Note
This section is outdated!
Note
SAGA has been designed as a zero-configuration library. Unless you are experiencing problems with one of the default configuration settings, there’s really no need to create a configuration file for SAGA.
SAGA and its individual middleware adaptors provide various optional conf_options. While SAGA tries to provide sensible default values for the majority of these options (zero-conf), it can sometimes be necessary to modify or extend SAGA’s configuration. SAGA provides two ways to access and modify its configuration: via Configuration Files (recommended) and via the Configuration API (for advanced use-cases).
Configuration Files¶
If you need to make persistent changes to any of SAGA’s conf_options, the simplest option is to create a configuration file. During startup, SAGA checks for the existence of a configuration file in $HOME/.saga.conf. If that configuration file is found, it is parsed by SAGA’s configuration system. SAGA configuration files use a structure that looks like this:
[radical.saga.engine]
option = value
[radical.saga.logger]
option = value
[radical.saga.adaptor.name]
option = value
Configuration API¶
Module radical.saga.utils.config¶
The config module provides classes and functions to introspect and modify
SAGA’s configuration. The getConfig()
function is used to get the
GlobalConfig
object which represents the current configuration
of SAGA:
from radical.saga.utils.config import getConfig
sagaconf = getConfig()
print sagaconf.get_category('saga.utils.logger')
Logging System¶
In a distributed environment unified error logging and reporting is a crucial capability for debugging and monitoring. SAGA has a configurable logging system that captures debug, info, warning and error messages across all of its middelware adaptors. The logging system can be controlled in two different ways: via Environment Variables variables, which should be sufficient in most scenarios, and via the log_api, which provides programmatic access to the logging system for advanced use-cases.
Environment Variables¶
Several environment variables can be used to control SAGA’s logging behavior from the command line. Obviously, this can come in handy when debugging a problem with an existing SAGA application. Environment variables are set in the executing shell and evaluated by SAGA at program startup.
- RADICAL_SAGA_LOG_LVL¶
Controls the log level. This controls the amount of output generated by the logging system.
RADICAL_LOG_LVL
expects either a numeric (0-4) value or a string (case insensitive) representing the log level:Numeric Value
Log Level
Type of Messages Displayed
0 (default)
CRITICAL
Only fatal events that will cause SAGA to abort.
1
ERROR
Errors that will not necessarily cause SAGA to abort.
2
WARNING
Warnings that are generated by SAGA and its middleware adaptors.
3
INFO
Useful (?) runtime information that is generated by SAGA and its middleware adaptors.
4
DEBUG
Debug message added to the code by the developers. (Lots of output)
For example, if you want to see the debug messages that SAGA generates during program execution, you would set
RADICAL_LOG_LVL
toDEBUG
before you run your program:RADICAL_SAGA_LOG_LVL=DEBUG python mysagaprog.py
- RADICAL_LOG_LVL¶
Controls the message sources displayed. RCT use an hierarchal structure for its log sources. Starting with the root logger
RADICAL
, sub loggers are defined for internal logging events (RADICAL_SAGA
,RADICAL_SAGA_ENGINE
etc.) and individual middleware adaptors, e.g.,RADICAL_SAGA_ADAPTORS_NAME
.LOG_LVL
andLOG_TGT
can be set individually for those loggers.For example, if you want to see only the debug messages generated by
saga.engine
and a specific middleware adaptor calledxyz
you would set the following environment variables:RADICAL_LOG_LVL=ERROR \ # mute everything RADICAL_SAGA_ENGINNE_LOG_LVL=DEBUG \ # enable engine logger RADICAL_SAGA_ADAPTORS_XYZ_LOG_LVL=DEBUG \ # enable XYZ logger python mysagaprog.py
- RADICAL_SAGA_LOG_TGT¶
Controls where the log messages go. Multiple concurrent locations are supported.
RADICAL_LOG_TGT
expects either a single location or a comma-separated list of locations, where a location can either be a path/filename or thestdout
/stderr
keyword (case sensitive) for logging to the console.For example, if you want to see debug messages on the console but also want to log them in a file for further analysis, you would set the the following environment variables:
RADICAL_SAGA_LOG_LVL=DEBUG RADICAL_SAGA_LOG_TGT=stdout,./rs.log \ python mysagaprog.py
Tutorial¶
This tutorial explains the job and filesystem packages, arguably the most widely used capabilities in radical.saga. It covers local as well as remote job submission and management (ssh, pbs, sge) and file operations (sftp).
Prerequisites:
You are familiar with Linux or UNIX
You can read and write Python code
You can use SSH and understand how public and private keys work
You understand the basic concepts of distributed computing
You will learn how to:
Install SAGA on your own machine
Write a program that runs a job locally on your machine
Use the same program with a different plug-in to run the job on a remote site
Add file transfer capabilities to the program to retrieve results
Contents:
Part 1: Introduction¶
The RADICAL-SAGA module provides an object-oriented programming interface for job submission and management, resource allocation, file handling and coordination and communication - functionality that is required in the majority of distributed applications, frameworks and tool.
SAGA encapsulates the complexity and heterogeneity of different distributed computing systems and ‘cyberinfrastructures’ by providing a single, coherent API to the application developer. The so-called adaptor-mechanism that is transparent to the application translates the API calls to the different middleware interfaces. A list of available adaptors can be found in chapter_adaptors.
In part 2 of this tutorial, we will start with using the local (fork) job adaptor. In part 3, we use the ssh job adaptor to submit a job to a remote host. In part 4, we use one of the HPC adaptors (sge, slurm, pbs) to submit a job to an HPC cluster. Additionally, we introduce the sftp file adaptor to implement input and output file staging.
Installation¶
Warning
RADICAL-SAGA requires Python >= 3.6. It won’t work with an older version of Python.
Install Virtualenv¶
A small Python command-line tool called virtualenv allows you to create a local Python environment (sandbox) in user space, which allows you to install additional Python packages without having to be ‘root’.
To create your local Python environment run the following command (you can install virtualenv on most systems via apt-get or yum, etc.):
virtualenv $HOME/tutorial
If you don’t have virtualenv installed and you don’t have root access on your machine, you can use the following script instead:
curl --insecure -s https://raw.github.com/pypa/virtualenv/master/virtualenv.py | python - $HOME/tutorial
Note
If you have multiple Python versions installed on your system, you can use the virtualenv --python=PYTHON_EXE
flag to force virtualenv to use a specific version.
Next, you need to activate your Python environment in order to make it work:
source $HOME/tutorial/bin/activate
Activating the virtualenv is very important. If you don’t activate your virtualenv, the rest of this tutorial will not work. You can usually tell that your environment is activated properly if your bash command-line prompt starts with (tutorial)
.
Install RADICAL-SAGA¶
The latest radical.saga module is available via the Python Package Index (PyPi). PyPi packages are installed very similar to Linux deb or rpm packages with a tool called pip
(which stands for “pip installs packages”). Pip is installed by default in your virtualenv, so in order to install RADICAL-SAGA, the only thing you have to do is this:
pip install radical.saga
To make sure that your installation works, run the following command to check if the radical.saga module can be imported by the interpreter (the output of the command below should be version number of the radical.saga module):
python -c "import radical.saga as rs; print(rs.version)"
Part 2: Local Job Submission¶
One of the most important feature of RADICAL-SAGA is the capability to submit jobs to local and remote queueing systems and resource managers. This first example explains how to define a SAGA job using the Job API and run it on your local machine.
If you are somewhat familiar with Python and the principles of distributed computing, the Hands-On code example is probably all you want to know. The code is relatively simple and pretty self-explanatory. If you have questions about the code or if you want to know in detail what’s going on, read the Details and Discussion section further below.
Hands-On: Local Job Submission¶
Before we discuss the individual API call in more detail, let’s get down and dirty and run our first example: creating and running a SAGA job on your local machine.
Create a new file saga_example_local.py
and paste the following code (
or download it directly from here
.
)
#!/usr/bin/env python
__author__ = "Ole Weidner"
__copyright__ = "Copyright 2012-2013, The SAGA Project"
__license__ = "MIT"
import sys
import radical.saga as rs
def main():
try:
# Create a job service object that represent the local machine.
# The keyword 'fork://' in the url scheme triggers the 'shell' adaptor
# which can execute jobs on the local machine as well as on a remote
# machine via "ssh://hostname".
js = rs.job.Service("fork://localhost")
# describe our job
jd = rs.job.Description()
# Next, we describe the job we want to run. A complete set of job
# description attributes can be found in the API documentation.
jd.environment = {'MYOUTPUT':'"Hello from SAGA"'}
jd.executable = '/bin/echo'
jd.arguments = ['$MYOUTPUT']
jd.output = "mysagajob.stdout"
jd.error = "mysagajob.stderr"
# Create a new job from the job description. The initial state of
# the job is 'New'.
myjob = js.create_job(jd)
# Check our job's id and state
print("Job ID : %s" % (myjob.id))
print("Job State : %s" % (myjob.state))
print("\n...starting job...\n")
# Now we can start our job.
myjob.run()
print("Job ID : %s" % (myjob.id))
print("Job State : %s" % (myjob.state))
print("\n...waiting for job...\n")
# wait for the job to either finish or fail
myjob.wait()
print("Job State : %s" % (myjob.state))
print("Exitcode : %s" % (myjob.exit_code))
return 0
except rs.SagaException as ex:
# Catch all saga exceptions
print("An exception occured: (%s) %s " % (ex.type, (str(ex))))
# Trace back the exception. That can be helpful for debugging.
print(" \n*** Backtrace:\n %s" % ex.traceback)
return -1
if __name__ == "__main__":
sys.exit(main())
Run the Code¶
Save the file and execute it (make sure your virtualenv is activated):
python saga_example_local.py
The output should look something like this:
Job ID : [fork://localhost]-[None]
Job State : rs.job.Job.New
...starting job...
Job ID : [fork://localhost]-[644240]
Job State : rs.job.Job.Pending
...waiting for job...
Job State : rs.job.Job.Done
Exitcode : None
Check the Output¶
Once the job has completed, you will find a file mysagajob.stdout in your current working directory. It should contain the line:
Hello from SAGA
A Quick Note on Logging and Debugging¶
Since working with distributed systems is inherently complex and much of the
complexity is hidden within RADICAL-SAGA, it is necessary to do a lot of internal
logging. By default, logging output is disabled, but if something goes wrong or
if you’re just curious, you can enable the logging output by setting the
environment variable SAGA_VERBOSE
to a value between 1 (print only critical
messages) and 5 (print all messages). Give it a try with the above example:
SAGA_VERBOSE=5 python saga_example_local.py
Discussion¶
Now that we have successfully run our first job with radical.saga, we will discuss some of the the building blocks and details of the code.
The job submission and management capabilities of radical.saga are packaged in the `rs.job module (API Doc). Three classes are defined in this module:
The
job.Service
class provides a handle to the resource manager, like for example a remote PBS cluster.The
job.Description
class is used to describe the executable, arguments, environment and requirements (e.g., number of cores, etc) of a new job.The
job.Job
class is a handle to a job associated with a job.Service. It is used to control (start, stop) the job and query its status (e.g., Running, Finished, etc).
In order to use the SAGA Job API, we first need to import the radical.saga module:
import radical.saga as rs
Next, we create a job.Service
object that represents the compute resource you
want to use (see figure above). The job service takes a single URL as parameter.
The URL is a way to tell radical.saga what type of resource or middleware you
want to use and where it is. The URL parameter is passed to radical.saga’s plug-
in selector and based on the URL scheme, a plug-in is selected. In this case the
Local job plug-in is selected for fork://
. URL scheme - Plug-in mapping is
described in chapter_adaptors.
js = rs.job.Service("fork://localhost")
To define a new job, a job.Description object needs to be created that contains information about the executable we want to run, its arguments, the environment that needs to be set and some other optional job requirements:
jd = rs.job.Description()
# environment, executable & arguments
jd.environment = {'MYOUTPUT':'"Hello from SAGA"'}
jd.executable = '/bin/echo'
jd.arguments = ['$MYOUTPUT']
# output options
jd.output = "mysagajob.stdout"
jd.error = "mysagajob.stderr"
Once the job.Service
has been created and the job has been defined via the
job.Description
object, we can create a new instance of the job via the
create_job
method of the job.Service
and use the resulting object to
control (start, stop) and monitor the job:
myjob = js.create_job(jd) # create a new job instance
myjob.run() # start the job instance
print("Initial Job ID : %s" % (myjob.jobid))
print("Initial Job State : %s" % (myjob.get_state()))
myjob.wait() # Wait for the job to reach either 'Done' or 'Failed' state
print("Final Job ID : %s" % (myjob.jobid))
print("Final Job State : %s" % (myjob.get_state()))
Part 3: Remote Job Submission¶
Next, we take the previous example and modify it, so that our job is executed on a remote machine instead of localhost. This examples shows one of the most important capabilities of SAGA: abstracting system heterogeneity. We can use the same code we have used to run a job via ‘fork’ with minimal modifications to run a job on a different resource, e.g., via ‘ssh’ on another remote system or via ‘pbs’ or ‘sge’ on a remote cluster.
Prerequisites¶
This example assumes that you have SSH access to a remote resource, either a single host or an HPC cluster.
The example also assumes that you have a working public/private SSH key-pair and that you can log-in to your remote resource of choice using those keys, i.e., your public key is in the ~/.ssh/authorized_hosts file on the remote machine. If you are not sure how this works, you might want to read about SSH and GSISSH first.
Hands-On: Remote Job Submission¶
Copy the code from the previous example to a new file saga_example_remote.py.
Add a saga.Context
and saga.Session
right before the job.Service
object initialization. Sessions and Contexts describe your SSH identity on the
remote machine:
ctx = saga.Context("ssh")
ctx.user_id = "oweidner"
session = saga.Session()
session.add_context(ctx)
To change the execution host for the job, change the URL in the job.Service
constructor. If you want to use a remote SSH host, use an ssh:// URL. Note that
the session is passed as an additional parameter to the Service constructor:
js = saga.job.Service("ssh://remote.host.net", session=session)
Alternatively, if you have access to a PBS cluster, use a pbs+ssh://...
URL:
js = saga.job.Service("pbs+ssh://remote.hpchost.net", session=session)
There are more URL options. Have a look at the chapter_adaptors section
for a complete list. If you submitting your job to a PBS cluster (pbs+ssh://),
you will probably also have to make some modifications to your job.Description
.
Depending on the configuration of your cluster, you might have to put in the
name of the queue you want to use or the allocation or project name that should
be credited:
jd = saga.job.Description()
jd.environment = {'MYOUTPUT':'"Hello from SAGA"'}
jd.executable = '/bin/echo'
jd.arguments = ['$MYOUTPUT']
jd.output = "mysagajob.stdout"
jd.error = "mysagajob.stderr"
jd.queue = "short" # Using a specific queue
jd.project = "TG-XYZABCX" # Example for an XSEDE/TeraGrid allocation
Run the Code¶
Save the file and execute it (make sure your virtualenv is activated):
python saga_example_remote.py
The output should look something like this:
Job ID : None
Job State : New
...starting job...
Job ID : [ssh://gw68.quarry.iu.teragrid.org]-[18533]
Job State : Done
...waiting for job...
Job State : Done
Exitcode : 0
Values marked as ‘None’ could not be fetched from the backend, at that point.
Check the Output¶
As opposed to the previous “local” example, you won’t find a mysagajob.stdout
file in your working directory. This is because the file has been created on the
remote host were your job was executed. In order to check the content, you would
have to log-in to the remote machine. We will address this issue in the next
example.
Discussion¶
Besides changing the job.Service
URL to trigger a different middleware
plug-in, we have introduced another new aspect in this tutorial example:
Contexts. Contexts are used to define security / log-in contexts for SAGA
objects and are passed to the executing plug-in (e.g., the SSH plug-in).
A context always has a type that matches the executing plug-in. The two most
commonly used contexts in SAGA are ssh
and gsissh
:
# Your ssh identity on the remote machine
ctx = saga.Context("ssh")
ctx.user_id = "oweidner"
A Context can’t be used by itself, but rather has to be added to a
saga.Session
object. A session can have one or more Contexts. At runtime,
RADICAL-SAGA will iterate over all Contexts of a Session to see if any of them
can be used to establish a connection.
session = saga.Session()
session.add_context(ctx)
Finally, Sessions are passed as an extra parameter during object creation, otherwise they won’t get considered:
js = saga.job.Service("ssh://remote.host.net", session=ses)
The complete API documentation for Session and Context classes can be found in the Library Reference section of this manual.
Part 4: Adding File Transfer¶
In this fourth part of the tutorial, we again build on the previous example and some code that copies our job’s output file back to the local machine. This is done using the saga.filesystem API package.
Prerequisites¶
This example assumes that you have SFTP access to the remote resource that you have used in the previous example. Again, this example assumes that you have a working public/private SSH key-pair and that you can sftp into your remote resource using those keys, i.e., your public key is in the ~/.ssh/authorized_hosts file on the remote machine. If you are not sure how this works, you might want to read about SSH and GSISSH first.
Hands-On: Remote Job Submission with File Staging¶
Copy the code from the previous example 3 to a new file saga_example_remote_staging.py
.
Add the following code after the last print, right before the except statement:
Note
Make sure that you adjust the paths to reflect your home directory on the remote machine.
outfilesource = 'sftp://gw68.quarry.iu.teragrid.org/users/oweidner/mysagajob.stdout'
outfiletarget = 'file://localhost/tmp/'
out = saga.filesystem.File(outfilesource, session=ses)
out.copy(outfiletarget)
print("Staged out %s to %s (size: %s bytes)" % (outfilesource, outfiletarget, out.get_size()))
Run the Code¶
Save the file and execute it (make sure your virtualenv is activated):
python saga_example_remote.py
The output should look something like this:
Job ID : None
Job State : New
...starting job...
Job ID : [ssh://gw68.quarry.iu.teragrid.org]-[18533]
Job State : Done
...waiting for job...
Job State : Done
Exitcode : 0
Staged out gw68.quarry.iu.teragrid.org/users/oweidner/mysagajob.stdout to file://localhost/tmp/ (size: 16 bytes)
Check the Output¶
Your output file should now be in /tmp/mysagajob.stdout and contain the
string Hello from SAGA
.
Coding URLs¶
URLs sftp://username:password@hostname:port//some/path
and
sftp://username:password@hostname:port/some/path
(note the two //
in
the first URL) are supposed to be the same. However, SAGA may treat them
diffently due some missing path normalization. URL should be coded with single
‘/’ or composed as follow:
remote_dir_url = saga.Url()
remote_dir_url.scheme = 'sftp'
remote_dir_url.username = username
remote_dir_url.username = $PASSWORD
remote_dir_url.host = hostname
remote_dir_url.port = port
remote_dir_url.path = '/some/path'
Part 5: A More Complex Example: Mandelbrot¶
Warning
If you want to run the Mandelbrot example on OSG with Condor, please refer to the OSG-specific instructions: Part 5: A More Complex Example: Mandelbrot (OSG VERSION).
In this example, we split up the calculation of a Mandelbrot set into several tiles, submit a job for each tile using the SAGA Job API, retrieve the tiles using the SAGA File API and stitch together the final image from the individual tiles. This example shows how SAGA can be used to create more complex application workflows that involve multiple aspects of the API.
Hands-On: Distributed Mandelbrot Fractals¶
In order for this example to work, we need to install an additional Python module, the Python Image Library (PIL). This is done via pip:
pip install Pillow
Next, we need to download the Mandelbrot fractal generator itself as well as the shell wrapper scrip. It is really just a very simple python script that, if invoked on the command line, outputs a full or part of a Mandelbrot fractal as a PNG image. Download the scripts into your $HOME directory:
curl --insecure -Os https://raw.githubusercontent.com/radical-cybertools/radical.saga/devel/examples/tutorial/mandelbrot/mandelbrot.py
curl --insecure -Os https://raw.githubusercontent.com/radical-cybertools/radical.saga/devel/examples/tutorial/mandelbrot/mandelbrot.sh
You can give mandelbrot.py a test-drive locally by calculating a single-tiled 1024x1024 Mandelbrot fractal:
python mandelbrot.py 1024 1024 0 1024 0 1024 frac.gif
In your $HOME
directory, open a new file saga_mandelbrot.py with your
favorite editor and paste the following script (or download it directly from here
).:
__author__ = "Ole Weidner"
__copyright__ = "Copyright 2012-2013, The SAGA Project"
__license__ = "MIT"
import os
import sys
import time
from PIL import Image
import radical.saga as rs
#-----------------------------------------------------------------------------
#
# Change REMOTE_HOST to the machine you want to run this on.
# You might have to change the URL scheme below for REMOTE_JOB_ENDPOINT
# accordingly.
REMOTE_HOST = "localhost" # try this with different hosts
# This refers to your working directory on 'REMOTE_HOST'. If you use a\
# cluster for 'REMOTE_HOST', make sure this points to a shared filesystem.
REMOTE_DIR = "/tmp/" # change this to your home directory
# If you change 'REMOTE_HOST' above, you might have to change 'ssh://' to e.g.,
# 'pbs+ssh://', 'sge+ssh://', depdending on the type of service endpoint on
# that particualr host.
REMOTE_JOB_ENDPOINT = "ssh://" + REMOTE_HOST
# At the moment radical.saga only provides an sftp file adaptor, so changing
# the URL scheme here wouldn't make any sense.
REMOTE_FILE_ENDPOINT = "sftp://" + REMOTE_HOST + "/" + REMOTE_DIR
# the dimension (in pixel) of the whole fractal
imgx = 2048
imgy = 2048
# the number of tiles in X and Y direction
tilesx = 2
tilesy = 2
#-----------------------------------------------------------------------------
#
if __name__ == "__main__":
try:
# Your ssh identity on the remote machine
ctx = rs.Context("ssh")
#ctx.user_id = ""
session = rs.Session()
session.add_context(ctx)
# list that holds the jobs
jobs = []
# create a working directory in /scratch
dirname = '%s/mbrot/' % (REMOTE_FILE_ENDPOINT)
workdir = rs.filesystem.Directory(dirname, rs.filesystem.CREATE,
session=session)
# copy the executable and warpper script to the remote host
mbwrapper = rs.filesystem.File('file://localhost/%s/mandelbrot.sh' % os.getcwd())
mbwrapper.copy(workdir.get_url())
mbexe = rs.filesystem.File('file://localhost/%s/mandelbrot.py' % os.getcwd())
mbexe.copy(workdir.get_url())
# the saga job services connects to and provides a handle
# to a remote machine. In this case, it's your machine.
# fork can be replaced with ssh here:
jobservice = rs.job.Service(REMOTE_JOB_ENDPOINT, session=session)
for x in range(0, tilesx):
for y in range(0, tilesy):
# describe a single Mandelbrot job. we're using the
# directory created above as the job's working directory
outputfile = 'tile_x%s_y%s.gif' % (x, y)
jd = rs.job.Description()
#jd.queue = "development"
jd.wall_time_limit = 10
jd.total_cpu_count = 1
jd.working_directory = workdir.get_url().path
jd.executable = 'sh'
jd.arguments = ['mandelbrot.sh', imgx, imgy,
int(imgx/tilesx*x), int(imgx/tilesx*(x+1)),
int(imgy/tilesy*y), int(imgy/tilesy*(y+1)),
outputfile]
# create the job from the description
# above, launch it and add it to the list of jobs
job = jobservice.create_job(jd)
job.run()
jobs.append(job)
print(' * Submitted %s. Output will be written to: %s' % (job.id, outputfile))
# wait for all jobs to finish
while len(jobs) > 0:
for job in jobs:
jobstate = job.get_state()
print(' * Job %s status: %s' % (job.id, jobstate))
if jobstate in [rs.job.DONE, rs.job.FAILED]:
jobs.remove(job)
print("")
time.sleep(5)
# copy image tiles back to our 'local' directory
for image in workdir.list('*.gif'):
print(' * Copying %s/%s/%s back to %s' % (REMOTE_FILE_ENDPOINT,
workdir.get_url(),
image, os.getcwd()))
workdir.copy(image, 'file://localhost/%s/' % os.getcwd())
# stitch together the final image
fullimage = Image.new('RGB', (imgx, imgy), (255, 255, 255))
print(' * Stitching together the whole fractal: mandelbrot_full.gif')
for x in range(0, tilesx):
for y in range(0, tilesy):
partimage = Image.open('tile_x%s_y%s.gif' % (x, y))
fullimage.paste(partimage,
(int(imgx/tilesx*x), int(imgy/tilesy*y),
int(imgx/tilesx*(x+1)), int(imgy/tilesy*(y+1))))
fullimage.save("mandelbrot_full.gif", "GIF")
sys.exit(0)
except rs.SagaException as ex:
# Catch all saga exceptions
print("An exception occured: (%s) %s " % (ex.type, (str(ex))))
# Trace back the exception. That can be helpful for debugging.
print(" \n*** Backtrace:\n %s" % ex.traceback)
sys.exit(-1)
except KeyboardInterrupt:
# ctrl-c caught: try to cancel our jobs before we exit
# the program, otherwise we'll end up with lingering jobs.
for job in jobs:
job.cancel()
sys.exit(-1)
Look at the code and change the constants at the very top accordingly. Then run it. The output should look something like this:
python saga_mandelbrot.py
* Submitted [ssh://india.futuregrid.org]-[4073]. Output will be written to: tile_x0_y0.gif
* Submitted [ssh://india.futuregrid.org]-[4094]. Output will be written to: tile_x0_y1.gif
* Submitted [ssh://india.futuregrid.org]-[4116]. Output will be written to: tile_x1_y0.gif
* Submitted [ssh://india.futuregrid.org]-[4144]. Output will be written to: tile_x1_y1.gif
* Job [ssh://india.futuregrid.org]-[4073] status: Running
* Job [ssh://india.futuregrid.org]-[4094] status: Running
* Job [ssh://india.futuregrid.org]-[4116] status: Running
* Job [ssh://india.futuregrid.org]-[4144] status: Running
* Job [ssh://india.futuregrid.org]-[4073] status: Running
* Job [ssh://india.futuregrid.org]-[4094] status: Running
* Job [ssh://india.futuregrid.org]-[4116] status: Running
* Job [ssh://india.futuregrid.org]-[4144] status: Running
* Job [ssh://india.futuregrid.org]-[4073] status: Done
* Job [ssh://india.futuregrid.org]-[4116] status: Running
* Job [ssh://india.futuregrid.org]-[4144] status: Running
* Job [ssh://india.futuregrid.org]-[4094] status: Done
* Job [ssh://india.futuregrid.org]-[4144] status: Done
* Job [ssh://india.futuregrid.org]-[4116] status: Done
* Copying sftp://india.futuregrid.org//N/u/oweidner/sftp://india.futuregrid.org//N/u/oweidner/mbrot//tile_x0_y0.gif back to /Users/oweidner/MB
* Copying sftp://india.futuregrid.org//N/u/oweidner/sftp://india.futuregrid.org//N/u/oweidner/mbrot//tile_x0_y1.gif back to /Users/oweidner/MB
* Copying sftp://india.futuregrid.org//N/u/oweidner/sftp://india.futuregrid.org//N/u/oweidner/mbrot//tile_x1_y0.gif back to /Users/oweidner/MB
* Copying sftp://india.futuregrid.org//N/u/oweidner/sftp://india.futuregrid.org//N/u/oweidner/mbrot//tile_x1_y1.gif back to /Users/oweidner/MB
* Stitching together the whole fractal: mandelbrot_full.gif
Open mandelbrot_full.gif with your favorite image editor. It should look like the image below. The different tile*.gif files (open them if you want) were computed on ‘REMOTE_HOST’, transfered back and stitched together as the full image.

Library Reference¶
Intro library reference…
URLs¶
Url – radical.saga.Url
¶
- class radical.saga.Url(url_in='')[source]¶
The SAGA Url class.
URLs are used in several places in the SAGA API: to specify service endpoints for job submission or resource management, for file or directory locations, etc.
The URL class is designed to simplify URL management for these purposes – it allows to manipulate individual URL elements, while ensuring that the resulting URL is well formatted. Example:
# create a URL from a string location = saga.Url ("file://localhost/tmp/file.dat") d = saga.filesystem.Directory(location)
A URL consists of the following components (where one or more can be ‘None’):
<scheme>://<user>:<pass>@<host>:<port>/<path>?<query>#<fragment>
Each of these components can be accessed via its property or alternatively, via getter / setter methods. Example:
url = saga.Url ("scheme://pass:user@host:123/path?query#fragment") # modify the scheme url.scheme = "anotherscheme" # above is equivalent with url.set_scheme("anotherscheme")
Job Submission and Control¶
SAGA’s job management module is central to the API. It represents an application/executable running under the management of a resource manager. A resource manager can be anything from the local machine to a remote HPC queueing system to grid and cloud computing services.
The basic usage of the job module is as follows:
# A job.Description object describes the executable/application and its requirements
job_desc = radical.saga.job.Description()
job_desc.executable = '/bin/sleep'
job_desc.arguments = ['10']
job_desc.output = 'myjob.out'
job_desc.error = 'myjob.err'
# A job.Service object represents the resource manager. In this example we use the 'local' adaptor to represent the local machine
service = radical.saga.job.Service('local://localhost')
# A job is created on a service (resource manager) using the job description
job = service.create_job(job_desc)
# Run the job and wait for it to finish
job.run()
print "Job ID : %s" % (job.job_id)
job.wait()
# Get some info about the job
print "Job State : %s" % (job.state)
print "Exitcode : %s" % (job.exit_code)
service.close()
See also
More examples can be found in the individual adaptor sections!
Like all SAGA modules, the job module relies on middleware adaptors
to provide bindings to a specific resource manager. Adaptors are implicitly
selected via the scheme part of the URL, e.g., local://
in the example
above selects the local job adaptor. The Job Service – radical.saga.job.Service section explains
this in more detail.
Note
A list of available adaptors and supported resource managers can be found in the Developer Documentation part of this documentation.
The rest of this section is structured as follows:
Job Service – radical.saga.job.Service
¶
- class radical.saga.job.Service(rm=None, session=None, _adaptor=None, _adaptor_state={}, _ttype=None)[source]¶
The job.Service represents a resource management backend, and as such allows the creation, submission and management of jobs.
A job.Service represents anything which accepts job creation requests, and which manages thus created
saga.job.Job
instances. That can be a local shell, a remote ssh shell, a cluster queuing system, a IaaS backend – you name it.The job.Service is identified by an URL, which usually points to the contact endpoint for that service.
Example:
service = saga.job.Service("fork://localhost") ids = service.list() for job_id in ids : print(job_id) j = service.get_job(job_id) if j.get_state() == saga.job.Job.Pending: print("pending") elif j.get_state() == saga.job.Job.Running: print("running") else: print("job is already final!") service.close()
- __init__(rm, session)[source]¶
Create a new job.Service instance.
- Parameters:
rm (string or
saga.Url
) – resource manager URLsession (
saga.Session
) – an optional session object with security contexts
- Return type:
saga.job.Service
- close()[source]¶
Close the job service instance and disconnect from the (remote) job service if necessary. Any subsequent calls to a job service instance after close() was called will fail.
Example:
service = saga.job.Service("fork://localhost") # do something with the 'service' object, create jobs, etc... service.close() service.list() # this call will throw an exception
Warning
While in principle the job service destructor calls close() automatically when a job service instance goes out of scope, you shouldn’t rely on it. Python’s garbage collection can be a bit odd at times, so you should always call close() explicitly. Especially in a multi-threaded program this will help to avoid random errors.
- create_job(job_desc)[source]¶
Create a new job.Job instance from a
Description
. The resulting job instance is inNEW
state.- Parameters:
job_desc (
saga.job.Description
) – job description to create the job fromttype – |param_ttype|
- Return type:
saga.job.Job
or |rtype_ttype|
create_job() accepts a job description, which described the application instance to be created by the backend. The create_job() method is not actually attempting to run the job, but merely parses the job description for syntactic and semantic consistency. The job returned object is thus not in ‘Pending’ or ‘Running’, but rather in ‘New’ state. The actual submission is performed by calling run() on the job object.
Example:
# A job.Description object describes the executable/application # and its requirements job_desc = saga.job.Description() job_desc.executable = '/bin/sleep' job_desc.arguments = ['10'] job_desc.output = 'myjob.out' job_desc.error = 'myjob.err' service = saga.job.Service('local://localhost') job = service.create_job(job_desc) # Run the job and wait for it to finish job.run() print("Job ID : %s" % (job.job_id)) job.wait() # Get some info about the job print("Job State : %s" % (job.state)) print("Exitcode : %s" % (job.exit_code)) service.close()
- get_job(job_id)[source]¶
Return the job object for a given job id.
- Parameters:
job_id – The id of the job to retrieve
- Return type:
saga.job.Job
Job objects are a local representation of a remote stateful entity. The job.Service supports to reconnect to those remote entities:
service = saga.job.Service("fork://localhost") j = service.get_job(my_job_id) if j.get_state() == saga.job.Job.Pending: print("pending") elif j.get_state() == saga.job.Job.Running: print("running") else: print("job is already final!") service.close()
- get_url()[source]¶
Return the URL this Service instance was created with.
See also
The
url
property and theget_url()
method are semantically equivalent and only duplicated for convenience.
- property jobs¶
list()
Return a list of the jobs that are managed by this Service instance.
See also
The
jobs
property and thelist()
method are semantically equivalent.- Ttype:
- Return type:
list of
saga.job.Job
As the job.Service represents a job management backend, list() will return a list of job IDs for all jobs which are known to the backend, and which can potentially be accessed and managed by the application.
Example:
service = saga.job.Service("fork://localhost") ids = service.list() for job_id in ids : print(job_id) service.close()
- list()[source]¶
Return a list of the jobs that are managed by this Service instance.
See also
The
jobs
property and thelist()
method are semantically equivalent.- Ttype:
- Return type:
list of
saga.job.Job
As the job.Service represents a job management backend, list() will return a list of job IDs for all jobs which are known to the backend, and which can potentially be accessed and managed by the application.
Example:
service = saga.job.Service("fork://localhost") ids = service.list() for job_id in ids : print(job_id) service.close()
- property url¶
get_url()
Return the URL this Service instance was created with.
See also
The
url
property and theget_url()
method are semantically equivalent and only duplicated for convenience.
Job Description – radical.saga.job.Description
¶
Warning: There is no guarantee that all middleware adaptors implement all job
description attributes. In case a specific attribute is not supported, the
create_job()
will throw an exception. Please refer to
the Developer Documentation documentation for more details and
adaptor-specific lists of supported attributes.
- class radical.saga.job.Description[source]¶
Bases:
Attributes
The job description class.
SAGA defines the following constants as valid job description attributes:
- radical.saga.job.EXECUTABLE¶
The executable to start once the job starts running:
jd = radical.saga.job.Description() jd.executable = "/bin/sleep"
- Type:
str
- radical.saga.job.executable¶
Same as attribute
EXECUTABLE
.
- radical.saga.job.ARGUMENTS¶
Arguments to pass to the
EXECUTABLE
:jd = radical.saga.job.Description() jd.arguments = ['--flag1', '--flag2']
- Tpye:
list()
- radical.saga.job.ENVIRONMENT¶
Environment variables to set in the job’s context:
jd = radical.saga.job.Description() jd.environemnt = {'FOO': 'BAR', 'FREE': 'BEER'}
- Type:
dict()
- radical.saga.job.environment¶
Same as attribute
ENVIRONMENT
.
- radical.saga.job.WORKING_DIRECTORY¶
The working directory of the job:
jd = radical.saga.job.Description() jd.working_directory = "/scratch/experiments/123/"
- Type:
str()
- radical.saga.job.working_directory¶
Same as attribute
WORKING_DIRECTORY
.
- radical.saga.job.OUTPUT¶
Filename to capture the executable’s STDOUT stream. If
output
is a relative filename, the file is relative toWORKING_DIRECTORY
:jd = radical.saga.job.Description() jd.output = "myjob_stdout.txt"
- Type:
str()
- radical.saga.job.ERROR¶
Filename to capture the executable’s STDERR stream. If
error
is a relative filename, the file is relative toWORKING_DIRECTORY
:jd = radical.saga.job.Description() jd.error = "myjob_stderr.txt"
- Type:
str()
- radical.saga.job.FILE_TRANSFER¶
Files to stage-in before the job starts running and to stage out once the job has finished running. The syntax is as follows:
local_file OPERATOR remote_file
OPERATOR
can be one of the following:>
copies the local file to the remote fille before the job starts. Overwrites the remote file if it exists.<
copies the remote file to the local file after the job finishes. Overwrites the local file if it exists
Example:
jd = radical.saga.job.Description() jd.input_file_transfer = ["file://localhost/data/input/test.dat > "test.dat", "file://localhost/data/results/1/result.dat < "result1.dat" ]
- Type:
list()
- radical.saga.job.file_transfer¶
Same as attribute
FILE_TRANSFER
.
- radical.saga.job.QUEUE¶
The name of the queue to submit the job to:
jd = radical.saga.job.Description() jd.queue = "mpi_long"
- Type:
str()
- radical.saga.job.PROJECT¶
The name of the project / allocation to charged for the job
jd = radical.saga.job.Description() jd.project = "TG-XYZ123456"
- Type:
str()
- radical.saga.job.SPMD_VARIATION¶
Describe me!
- Type:
str()
- radical.saga.job.spmd_variation¶
(Property) Same as attribute
SPMD_VARIATION
.
- radical.saga.job.TOTAL_CPU_COUNT = 'TotalCPUCount'¶
str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.
- radical.saga.job.total_cpu_count¶
(Property) Same as attribute
TOTAL_CPU_COUNT
.- Type:
int() or str()
- radical.saga.job.TOTAL_GPU_COUNT = 'TotalGPUCount'¶
str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.
- radical.saga.job.total_gpu_count¶
(Property) Same as attribute
TOTAL_GPU_COUNT
.- Type:
int() or str()
- radical.saga.job.NUMBER_OF_PROCESSES = 'NumberOfProcesses'¶
str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.
- radical.saga.job.number_of_processes¶
(Property) Same as attribute
NUMBER_OF_PROCESSES
.- Type:
int() or str()
- radical.saga.job.PROCESSES_PER_HOST = 'ProcessesPerHost'¶
str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.
- radical.saga.job.processes_per_host¶
(Property) Same as attribute
PROCESSES_PER_HOST
.- Type:
int() or str()
- radical.saga.job.THREADS_PER_PROCESS = 'ThreadsPerProcess'¶
str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.
- radical.saga.job.threads_per_process¶
(Property) Same as attribute
THREADS_PER_PROCESS
.- Type:
int() or str()
# NOT IMPLEMENTED.. autodata:: INTERACTIVE
- radical.saga.job.CLEANUP = 'Cleanup'¶
str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.
- radical.saga.job.JOB_START_TIME = 'JobStartTime'¶
str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.
- radical.saga.job.job_start_time¶
(Property) Same as attribute
JOB_START_TIME
.- Type:
UNIX timestamp
- radical.saga.job.WALL_TIME_LIMIT = 'WallTimeLimit'¶
str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.
- radical.saga.job.wall_time_limit¶
(Property) Same as attribute
WALL_TIME_LIMIT
.
- radical.saga.job.TOTAL_PHYSICAL_MEMORY = 'TotalPhysicalMemory'¶
str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.
- radical.saga.job.total_physical_memory¶
(Property) Same as attribute
TOTAL_PHYSICAL_MEMORY
.
- radical.saga.job.SYSTEM_ARCHITECTURE = 'SystemArchitecture'¶
str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.
- radical.saga.job.system_architecture¶
(Property) Same as attribute
SYSTEM_ARCHITECTURE
.- Type:
dict()
- radical.saga.job.OPERATING_SYSTEM_TYPE = 'OperatingSystemType'¶
str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.
- radical.saga.job.operating_system_type¶
(Property) Same as attribute
OPERATIN_SYSTEM_TYPE
.
- radical.saga.job.CANDIDATE_HOSTS = 'CandidateHosts'¶
str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.
- radical.saga.job.candidate_hosts¶
(Property) Same as attribute
CANDIDATE_HOSTS
.
- radical.saga.job.JOB_CONTACT = 'JobContact'¶
str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.
- radical.saga.job.job_contact¶
(Property) Same as attribute
JOB_CONTACT
.
Jobs – radical.saga.job.Job
¶
- class radical.saga.job.Job(_method_type='run', _adaptor=None, _adaptor_state={}, _ttype=None)[source]¶
Bases:
Base
,Task
,Async
Represents a SAGA job as defined in GFD.90
A ‘Job’ represents a running application instance, which may consist of one or more processes. Jobs are created by submitting a Job description to a Job submission system – usually a queuing system, or some other service which spawns jobs on the user’s behalf.
Jobs have a unique ID (see get_job_id()), and are stateful entities – their ‘state’ attribute changes according to a well defined state model:
A job as returned by job.Service.create(jd) is in ‘New’ state – it is not yet submitted to the job submission backend. Once it was submitted, via run(), it will enter the ‘Pending’ state, where it waits to get actually executed by the backend (e.g. waiting in a queue etc). Once the job is actually executed, it enters the ‘Running’ state – only in that state is the job actually consuming resources (CPU, memory, …).
Jobs can leave the ‘Running’ state in three different ways: they finish successfully on their own (‘Done’), they finish unsuccessfully on their own, or get canceled by the job management backend (‘Failed’), or they get actively canceled by the user or the application (‘Canceled’).
The methods defined on the Job object serve two purposes: inspecting the job’s state, and initiating job state transitions.
- property ExitCode¶
The job’s exitcode.
this attribute is only meaningful if the job is in ‘Done’ or ‘Final’ state - for all other job states, this attribute value is undefined.
Example:
js = saga.job.Service("fork://localhost") jd = saga.job.Description () jd.executable = '/bin/date' j = js.create_job(jd) j.run() j.wait() if j.get_state() == saga.job.FAILED : if j.exitcode == "42" : print("Ah, galaxy bypass error!") else : print("oops!")
- property JobID¶
The job’s identifier.
This attribute is equivalent to the value returned by job.get_job_id()
- property ServiceURL¶
The URL of the
saga.job.Service
instance managing this job.This attribute is represents the URL under where the job management service can be contacted which owns the job. The value is equivalent to the service part of the job_id.
Example:
js = saga.job.Service("fork://localhost") jd = saga.job.Description () jd.executable = '/bin/date' j = js.create_job(jd) if j.serviceurl == "fork://localhost" : print("yes!") else : print("oops!")
- cancel(timeout)[source]¶
Cancel the execution of the job.
- Parameters:
timeout (float) – cancel will return after timeout
Example:
js = saga.job.Service("fork://localhost") jd = saga.job.Description () jd.executable = '/bin/date' j = js.create_job(jd) if j.get_state() == saga.job.NEW : print("new") else : print("oops!") j.run() if j.get_state() == saga.job.PENDING : print("pending") elif j.get_state() == saga.job.RUNNING : print("running") else : print("oops!") j.cancel() if j.get_state() == saga.job.CANCELED : print("canceled") else : print("oops!")
- property description¶
get_description()
Return the job description this job was created from.
The returned description can be used to inspect job properties (executable name, arguments, etc.). It can also be used to start identical job instances.
The returned job description will in general reflect the actual state of the running job, and is not necessarily a simple copy of the job description which was used to create the job instance. For example, the environment variables in the returned job description may reflect the actual environment of the running job instance.
Example:
service = saga.job.Service("fork://localhost") jd = saga.job.Description () jd.executable = '/bin/date' j1 = service.create_job(jd) j1.run() j2 = service.create_job(j1.get_description()) j2.run() service.close()
- get_description()[source]¶
Return the job description this job was created from.
The returned description can be used to inspect job properties (executable name, arguments, etc.). It can also be used to start identical job instances.
The returned job description will in general reflect the actual state of the running job, and is not necessarily a simple copy of the job description which was used to create the job instance. For example, the environment variables in the returned job description may reflect the actual environment of the running job instance.
Example:
service = saga.job.Service("fork://localhost") jd = saga.job.Description () jd.executable = '/bin/date' j1 = service.create_job(jd) j1.run() j2 = service.create_job(j1.get_description()) j2.run() service.close()
- get_log(ttype=None)[source]¶
get_log_string()
Return the job’s log information, ie. backend specific log messages which have been collected during the job execution. Those messages also include stdout/stderr from the job’s pre- and post-exec. The returned string generally contains one log message per line, but the format of the string is ultimately undefined.
ttype: saga.task.type enum ret: string / saga.Task
- get_log_string()[source]¶
Return the job’s log information, ie. backend specific log messages which have been collected during the job execution. Those messages also include stdout/stderr from the job’s pre- and post-exec. The returned string generally contains one log message per line, but the format of the string is ultimately undefined.
ttype: saga.task.type enum ret: string / saga.Task
THIS METHOD IS DEPRECATED AND WILL BE REMOVED IN A FUTURE RELEASE. USE job.get_log() INSTEAD.
- get_state()[source]¶
Return the current state of the job.
Example:
js = saga.job.Service("fork://localhost") jd = saga.job.Description () jd.executable = '/bin/date' j = js.create_job(jd) if j.get_state() == saga.job.NEW : print("new") else : print("oops!") j.run() if j.get_state() == saga.job.PENDING : print("pending") elif j.get_state() == saga.job.RUNNING : print("running") else : print("oops!")
- get_stderr_string()[source]¶
Return the job’s STDERR.
ttype: saga.task.type enum ret: string / saga.Task
THIS METHOD IS DEPRECATED AND WILL BE REMOVED IN A FUTURE RELEASE. USE job.get_stderr() INSTEAD.
- get_stdout_string()[source]¶
Return the job’s STDOUT as string.
ttype: saga.task.type enum ret: string / saga.Task
THIS METHOD IS DEPRECATED AND WILL BE REMOVED IN A FUTURE RELEASE. USE job.get_stdout() INSTEAD.
- property id¶
get_id()
Return the job ID.
- property log¶
get_log_string()
Return the job’s log information, ie. backend specific log messages which have been collected during the job execution. Those messages also include stdout/stderr from the job’s pre- and post-exec. The returned string generally contains one log message per line, but the format of the string is ultimately undefined.
ttype: saga.task.type enum ret: string / saga.Task
- property name¶
get_name()
Return the job name.
- run()[source]¶
Run (start) the job.
Request that the job is being executed by the backend. If the backend is accepting this run request, the job will move to the ‘Pending’ or ‘Running’ state – otherwise this method will raise an error, and the job will be moved to ‘Failed’.
Example:
js = saga.job.Service("fork://localhost") jd = saga.job.Description () jd.executable = '/bin/date' j = js.create_job(jd) if j.get_state() == saga.job.NEW : print("new") else : print("oops!") j.run() if j.get_state() == saga.job.PENDING : print("pending") elif j.get_state() == saga.job.RUNNING : print("running") else : print("oops!")
- property state¶
get_state()
Return the current state of the job.
Example:
js = saga.job.Service("fork://localhost") jd = saga.job.Description () jd.executable = '/bin/date' j = js.create_job(jd) if j.get_state() == saga.job.NEW : print("new") else : print("oops!") j.run() if j.get_state() == saga.job.PENDING : print("pending") elif j.get_state() == saga.job.RUNNING : print("running") else : print("oops!")
- wait(timeout)[source]¶
- Parameters:
timeout (float) – wait will return after timeout
Wait for a running job to finish execution.
The optional timeout parameter specifies the time to wait, and accepts the following values:
timeout < 0 : wait forever (block) -- same for 'None' timeout == 0 : wait not at all (non-blocking test) timeout > 0 : wait for 'timeout' seconds
On a non-negative timeout, the call can thus return even if the job is not in final state, and the application should check the actual job state. The default timeout value is ‘None’ (blocking).
Example:
js = saga.job.Service("fork://localhost") jd = saga.job.Description () jd.executable = '/bin/date' j = js.create_job(jd) if j.get_state() == saga.job.NEW : print("new") else : print("oops!") j.run() if j.get_state() == saga.job.PENDING : print("pending") elif j.get_state() == saga.job.RUNNING : print("running") else : print("oops!") j.wait(-1.0) if j.get_state() == saga.job.DONE : print("done") elif j.get_state() == saga.job.FAILED : print("failed") else : print("oops!")
Attributes¶
- radical.saga.job.ID = 'ID'¶
str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.
- radical.saga.job.EXECUTION_HOSTS = 'ExecutionHosts'¶
str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.
- radical.saga.job.CREATED = 'Created'¶
str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.
- radical.saga.job.STARTED = 'Started'¶
str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.
- radical.saga.job.FINISHED = 'Finished'¶
str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.
- radical.saga.job.EXIT_CODE = 'ExitCode'¶
str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.
States¶
The job state constants defined describe the possible states a job can be in. The constants can be used to check / compare the state of a job. For example:
if job.state == radical.saga.job.Pending:
# do_something
elif job.state == radical.saga.job.Running:
# do_something else
The constants also define the string representation of a state:
>>> str(j.state)
'Running'
SAGA defines the following constants as job states:
- radical.saga.job.UNKNOWN = 'Unknown'¶
str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.
- radical.saga.job.NEW = 'New'¶
str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.
- radical.saga.job.PENDING = 'Pending'¶
str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.
- radical.saga.job.RUNNING = 'Running'¶
str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.
- radical.saga.job.DONE = 'Done'¶
str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.
- radical.saga.job.CANCELED = 'Canceled'¶
str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.
- radical.saga.job.FAILED = 'Failed'¶
str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.
- radical.saga.job.SUSPENDED = 'Suspended'¶
str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.
Metrics¶
Job metrics provide a way to attach callback functions to a job object. As long as a callback remains registered, it will get triggered whenever a job metric changes.
Callback functions require three parameters:
- source:
the watched object instance
- metric:
the watched metric (e.g.
STATE
orSTATE_DETAIL
)- value:
the new value of the watched metric
Their return value determines if they remain registered (when returning True), or not (when returning False).
Callback functions are attached to a job object via the
add_callback()
method. For example:
# create a callback function
def state_cb (self, source, metric, value) :
print "Job %s state changed to %s : %s" % (source, value)
def main () :
# register the callback function with the 'State' metric
job.add_callback (radical.saga.job.STATE, state_cb)
job.add_callback (radical.saga.job.STATE, state_cb)
Warning: There is no guarantee that all middleware adaptors implement these metrics. In case they are not implemented, you can still subscribe to them, but you won’t receive any callbacks. Please refer to the Developer Documentation documentation for more details and adaptor-specific lists of supported metrics.
RADICAL SAGA defines the following constants as job metrics:
- radical.saga.job.STATE = 'State'¶
str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.
- radical.saga.job.STATE_DETAIL = 'StateDetail'¶
str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.
Job Containers – radical.saga.job.Container
¶
See also
More examples on how to use job containers can be found in the Job Module section of the chapter_code_examples chapter.
- class radical.saga.job.Container[source]¶
Bases:
Container
- Todo:
document me
- jobs¶
The (read-only) jobs property returns a list of all job objects in the container.
- Return type:
saga.job.Job
list
- size¶
The (read-only) size property returns the number of job objectis in the container.
- Return type:
int
- states¶
The (read-only) states property returns a list of states that represent the states of the individual jobs in the container.
- Return type:
list
Namespaces¶
Introduction¶
Namespaces are an abstraction over firlesystem and other hirarchical constructs which have a notion of a :class:`saga.namespace.Directory and of `:class:`saga.namespace.Entry`s which exist in those directories. The API provides a number of operations, which all behave similar to the common unix command line tools (cp, ls, rm etc).
Example:
# get a directory handle
dir = radical.saga.namespace.Directory("sftp://localhost/tmp/")
# create a subdir
dir.make_dir ("data/")
# list contents of the directory
entries = dir.list ()
# copy *.dat files into the subdir
for e in entries :
if re.match ('^.*\.dat$', f) :
dir.copy (e, "sftp://localhost/tmp/data/")
The above example covers most of the semantics of the namespace package – additional capabilities, such get_size() or move(), can be found in the individual class documentations.
Flags¶
The following constants are defined as valid flags for file and directory methods:
- radical.saga.namespace.OVERWRITE¶
- radical.saga.namespace.RECURSIVE¶
- radical.saga.namespace.CREATE¶
- radical.saga.namespace.CREATE_PARENTS¶
- radical.saga.namespace.LOCK¶
- radical.saga.namespace.EXCLUSIVE¶
- radical.saga.namespace.DEREFERENCE¶
Entry – radical.saga.namespace.Entry
¶
- class radical.saga.namespace.Entry(url=None, flags=None, session=None, _adaptor=None, _adaptor_state={}, _ttype=None)[source]¶
Bases:
Base
,Async
Represents a SAGA namespace entry as defined in GFD.90
The saga.namespace.Entry class represents, as the name indicates, an entry in some (local or remote) namespace. That class offers a number of operations on that entry, such as copy, move and remove:
# get an entry handle entry = saga.namespace.Entry ("sftp://localhost/tmp/data/data.bin") # copy the entry entry.copy ("sftp://localhost/tmp/data/data.bak") # move the entry entry.move ("sftp://localhost/tmp/data/data.new")
- __init__(url=None, flags=None, session=None, _adaptor=None, _adaptor_state={}, _ttype=None)[source]¶
- Parameters:
url (
saga.Url
) – Url of the (remote) entry
flags: flags enum session: saga.Session ret: obj
Construct a new entry object
The specified entry is expected to exist – otherwise a DoesNotExist exception is raised. Also, the URL must point to an entry (not to a directory), otherwise a BadParameter exception is raised.
Example:
# get an entry handle entry = saga.namespace.Entry("sftp://localhost/tmp/data/data.bin") # print the entry's url print(entry.get_url ())
- close(timeout=None, ttype=None)[source]¶
timeout: float ttype: saga.task.type enum ret: None / saga.Task
- copy(tgt, flags=0, ttype=None)[source]¶
tgt: saga.Url flags: enum flags ttype: saga.task.type enum ret: None / saga.Task
Copy the entry to another location
- Parameters:
target – Url of the copy target.
flags – Flags to use for the operation.
The entry is copied to the given target location. The target URL must be an absolute path, and can be a target entry name or target directory name. If the target entry exists, it is overwritten:
# copy an entry entry = saga.namespace.Entry("sftp://localhost/tmp/data/data.bin") entry.copy ("sftp://localhost/tmp/data/data.bak")
- property cwd¶
ttype: saga.task.type enum ret: string / saga.Task
- get_url(ttype=None)[source]¶
ttype: saga.task.type enum ret: saga.Url / saga.Task
Return the complete url pointing to the entry.
The call will return the complete url pointing to this entry as a saga.Url object:
# print URL of an entry entry = saga.namespace.Entry("sftp://localhost/etc/passwd") print(entry.get_url())
- is_dir(ttype=None)[source]¶
ttype: saga.task.type enum ret: bool / saga.Task
Returns True if path is a directory, False otherwise.
Example:
# inspect an entry dir = saga.namespace.Directory("sftp://localhost/tmp/") if dir.is_dir ('data'): # do something
- link(tgt, flags=0, ttype=None)[source]¶
tgt: saga.Url flags: enum flags ttype: saga.task.type enum ret: None / saga.Task
- move(tgt, flags=0, ttype=None)[source]¶
- Parameters:
target – Url of the move target.
flags – Flags to use for the operation.
ttype: saga.task.type enum ret: None / saga.Task
Move the entry to another location
The entry is copied to the given target location. The target URL must be an absolute path, and can be a target entry name or target directory name. If the target entry exists, it is overwritten:
# copy an entry entry = rs.namespace.Directory("sftp://localhost/tmp/data/data.bin") entry.move ("sftp://localhost/tmp/data/data.bak")
- property name¶
ttype: saga.task.type enum ret: string / saga.Task
- read_link(ttype=None)[source]¶
tgt: saga.Url / None ttype: saga.task.type enum ret: saga.Url / saga.Task
- remove(flags=0, ttype=None)[source]¶
- Parameters:
flags – Flags to use for the operation.
ttype: saga.task.type enum ret: None / saga.Task
Reove the entry.
The entry is removed, and this object instance is then invalid for further operations.
# remove an entry entry = rs.namespace.Directory(“sftp://localhost/tmp/data/data.bin”) entry.remove ()
- property url¶
ttype: saga.task.type enum ret: saga.Url / saga.Task
Return the complete url pointing to the entry.
The call will return the complete url pointing to this entry as a saga.Url object:
# print URL of an entry entry = saga.namespace.Entry("sftp://localhost/etc/passwd") print(entry.get_url())
Directory – radical.saga.namespace.Directory
¶
- class radical.saga.namespace.Directory(url=None, flags=None, session=None, _adaptor=None, _adaptor_state={}, _ttype=None)[source]¶
Bases:
Entry
Represents a SAGA directory as defined in GFD.90
The saga.namespace.Directory class represents, as the name indicates, a directory on some (local or remote) namespace. That class offers a number of operations on that directory, such as listing its contents, copying entries, or creating subdirectories:
# get a directory handle dir = saga.namespace.Directory("sftp://localhost/tmp/") # create a subdir dir.make_dir ("data/") # list contents of the directory entries = dir.list () # copy *.dat entries into the subdir for f in entries : if f ^ '^.*\.dat$' : dir.copy (f, "sftp://localhost/tmp/data/")
Implementation note:¶
The SAGA API Specification (GFD.90) prescribes method overloading on method signatures, but that is not supported by Python (Python only does method overwriting). So we implement one generic method version here, and do the case switching based on the provided parameter set.
- __init__(url=None, flags=None, session=None, _adaptor=None, _adaptor_state={}, _ttype=None)[source]¶
- Parameters:
url (
saga.Url
) – Url of the (remote) entry system directory.
flags: flags enum session: saga.Session ret: obj
Construct a new directory object
The specified directory is expected to exist – otherwise a DoesNotExist exception is raised. Also, the URL must point to a directory (not to an entry), otherwise a BadParameter exception is raised.
Example:
# open some directory dir = saga.namespace.Directory("sftp://localhost/tmp/") # and list its contents entries = dir.list ()
- change_dir(url, flags=0, ttype=None)[source]¶
url: saga.Url flags: flags enum ttype: saga.task.type enum ret: None / saga.Task
- copy(url_1, url_2=None, flags=0, ttype=None)[source]¶
- Parameters:
src – path of the entry to copy
tgt – absolute URL of target name or directory
url_1: saga.Url url_2: saga.Url / None flags: flags enum / None ttype: saga.task.type enum / None ret: None / saga.Task
Copy an entry from source to target
The source is copied to the given target directory. The path of the source can be relative:
# copy an entry dir = saga.namespace.Directory("sftp://localhost/tmp/") dir.copy ("./data.bin", "sftp://localhost/tmp/data/")
- exists(path, ttype=None)[source]¶
- Parameters:
path – path of the entry to check
ttype: saga.task.type enum ret: bool / saga.Task
Returns True if path exists, False otherwise.
Example:
# inspect an entry dir = saga.namespace.Directory("sftp://localhost/tmp/") if dir.exists ('data'): # do something
- find(pattern, flags=2, ttype=None)[source]¶
pattern: string flags: flags enum ttype: saga.task.type enum ret: list [saga.Url] / saga.Task
- is_dir(tgt=None, ttype=None)[source]¶
tgt: saga.Url / None ttype: saga.task.type enum ret: bool / saga.Task
Returns True if path is a directory, False otherwise.
Example:
# inspect an entry dir = saga.namespace.Directory("sftp://localhost/tmp/") if dir.is_dir ('data'): # do something
- is_entry(tgt=None, ttype=None)[source]¶
tgt: saga.Url / None ttype: saga.task.type enum ret: bool / saga.Task
- is_link(tgt=None, ttype=None)[source]¶
tgt: saga.Url / None ttype: saga.task.type enum ret: bool / saga.Task
- link(url_1, url_2=None, flags=0, ttype=None)[source]¶
src: saga.Url tgt: saga.Url flags: flags enum ttype: saga.task.type enum ret: None / saga.Task
- list(pattern=None, flags=0, ttype=None)[source]¶
- Parameters:
pattern – Entry name pattern (like POSIX ‘ls’, e.g. ‘*.txt’)
flags: flags enum ttype: saga.task.type enum ret: list [saga.Url] / saga.Task
List the directory’s content
The call will return a list of entries and subdirectories within the directory:
# list contents of the directory for f in dir.list() : print(f)
- make_dir(tgt, flags=0, ttype=None)[source]¶
- Parameters:
tgt – name/path of the new directory
flags – directory creation flags
ttype: saga.task.type enum ret: None / saga.Task
Create a new directoy
The call creates a directory at the given location.
Example:
# create a subdir 'data' in /tmp dir = saga.namespace.Directory("sftp://localhost/tmp/") dir.make_dir ('data/')
- move(url_1, url_2=None, flags=0, ttype=None)[source]¶
- Parameters:
src – path of the entry to copy
tgt – absolute URL of target directory
flags: flags enum ttype: saga.task.type enum ret: None / saga.Task
Move an entry from source to target
The source is moved to the given target directory. The path of the source can be relative:
# copy an entry dir = saga.namespace.Directory("sftp://localhost/tmp/") dir.move ("./data.bin", "sftp://localhost/tmp/data/")
- open(name, flags=None, ttype=None)[source]¶
name: saga.Url flags: saga.namespace.flags enum ttype: saga.task.type enum ret: saga.namespace.Entry / saga.Task
- open_dir(path, flags=None, ttype=None)[source]¶
- Parameters:
path – name/path of the directory to open
flags – directory creation flags
ttype: saga.task.type enum ret: saga.namespace.Directory / saga.Task
Open and return a new directoy
The call opens and returns a directory at the given location.
Example:
# create a subdir 'data' in /tmp dir = saga.namespace.Directory("sftp://localhost/tmp/") data = dir.open_dir ('data/', saga.namespace.Create)
Files and Directories¶
Introduction¶
The file managment API provides the ability to interact with (local and
remote) file systems via the two classes, saga.filesystem.Directory
and
saga.filesystem.File
. The API provides a number of operations, which all
behave similar to the common unix command line tools (cp, ls, rm etc).
Example:
# get a directory handle
dir = radical.saga.filesystem.Directory("sftp://localhost/tmp/")
# create a subdir
dir.make_dir ("data/")
# list contents of the directory
files = dir.list ()
# copy *.dat files into the subdir
for f in files :
if re.match ('^.*\.dat$', f) :
dir.copy (f, "sftp://localhost/tmp/data/")
The above example covers most of the semantics of the filesystem package – additional capabilities, such get_size() or move(), can be found in the individual class documentations.
Flags¶
The following constants are defined as valid flags for file and directory methods:
- radical.saga.filesystem.OVERWRITE¶
- radical.saga.filesystem.RECURSIVE¶
- radical.saga.filesystem.CREATE¶
- radical.saga.filesystem.CREATE_PARENTS¶
- radical.saga.filesystem.LOCK¶
- radical.saga.filesystem.EXCLUSIVE¶
- radical.saga.filesystem.DEREFERENCE¶
File – radical.saga.filesystem.File
¶
- class radical.saga.filesystem.File(url=None, flags=512, session=None, _adaptor=None, _adaptor_state={}, _ttype=None)[source]¶
Bases:
Entry
Represents a local or remote file.
The saga.filesystem.File class represents, as the name indicates, a file on some (local or remote) filesystem. That class offers a number of operations on that file, such as copy, move and remove:
# get a file handle file = saga.filesystem.File("sftp://localhost/tmp/data/data.bin") # copy the file file.copy ("sftp://localhost/tmp/data/data.bak") # move the file file.move ("sftp://localhost/tmp/data/data.new")
- __init__(url, flags=READ, session)[source]¶
Construct a new file object
- Parameters:
url (
saga.Url
) – Url of the (remote) filesession –
saga.Session
- Fgs:
The specified file is expected to exist – otherwise a DoesNotExist exception is raised. Also, the URL must point to a file (not to a directory), otherwise a BadParameter exception is raised.
Example:
# get a file handle file = saga.filesystem.File("sftp://localhost/tmp/data/data.bin") # print the file's size print(file.get_size ())
- close(kill=True, ttype=None)[source]¶
kill : bool ttype: saga.task.type enum ret: string / bytearray / saga.Task
- get_size()[source]¶
Returns the size (in bytes) of a file.
Example:
# get a file handle file = saga.filesystem.File("sftp://localhost/tmp/data/data.bin") # print the file's size print(file.get_size ())
- property size¶
get_size()
Returns the size (in bytes) of a file.
Example:
# get a file handle file = saga.filesystem.File("sftp://localhost/tmp/data/data.bin") # print the file's size print(file.get_size ())
Directory – radical.saga.filesystem.Directory
¶
- class radical.saga.filesystem.Directory(url=None, flags=512, session=None, _adaptor=None, _adaptor_state={}, _ttype=None)[source]¶
Bases:
Directory
Represents a (remote) directory.
The saga.filesystem.Directory class represents, as the name indicates, a directory on some (local or remote) filesystem. That class offers a number of operations on that directory, such as listing its contents, copying files, or creating subdirectories:
# get a directory handle dir = saga.filesystem.Directory("sftp://localhost/tmp/") # create a subdir dir.make_dir ("data/") # list contents of the directory files = dir.list () # copy *.dat files into the subdir for f in files : if f ^ '^.*\.dat$' : dir.copy (f, "sftp://localhost/tmp/data/")
- __init__(url, flags=READ, session)[source]¶
Construct a new directory object
- Parameters:
url (
saga.Url
) – Url of the (remote) directoryflags – Flags
session –
saga.Session
The specified directory is expected to exist – otherwise a DoesNotExist exception is raised. Also, the URL must point to a directory (not to a file), otherwise a BadParameter exception is raised.
Example:
# open some directory dir = saga.filesystem.Directory("sftp://localhost/tmp/") # and list its contents files = dir.list ()
- close(kill=True, ttype=None)[source]¶
kill : bool ttype: saga.task.type enum ret: string / bytearray / saga.Task
- get_size(path=None)[source]¶
Return the size of the directory itself or the entry pointed to by path.
- Parameters:
path (str()) – (Optional) name/path of an entry
Returns the size of a file or directory (in bytes)
Example:
# inspect a file for its size dir = saga.filesystem.Directory("sftp://localhost/tmp/") size = dir.get_size ('data/data.bin') print(size)
- is_file(path=None)[source]¶
Returns True if entry points to a file, False otherwise. If path is not none, the entry pointed to by path is inspected instead of the directory object itself.
- Parameters:
path (str()) – (Optional) name/path of an entry
- open(path, flags=READ)[source]¶
Open a file in the directory instance namespace. Returns a new file object.
- Parameters:
path (str()) – The name/path of the file to open
flags – Flags
- open_dir(path, flags=READ)[source]¶
Open a directory in the directory instance namespace. Returns a new directory object.
- Parameters:
path (str()) – The name/path of the directory to open
flags – Flags
Example:
# create a subdir 'data' in /tmp dir = saga.namespace.Directory("sftp://localhost/tmp/") data = dir.open_dir ('data/', saga.namespace.Create)
- property size¶
get_size(path=None)
Return the size of the directory itself or the entry pointed to by path.
- Parameters:
path (str()) – (Optional) name/path of an entry
Returns the size of a file or directory (in bytes)
Example:
# inspect a file for its size dir = saga.filesystem.Directory("sftp://localhost/tmp/") size = dir.get_size ('data/data.bin') print(size)
Resource Management¶
Let’s start with a basic example. We start a VM on Amazon EC2 using the SAGA resource API, submit a job to the newly instantiated VM using the SAGA job API and finally shut down the VM.
Note
In order to run this example, you need an account with Amazon EC2. You also need your Amazon EC2 id and key.
#!/usr/bin/env python
__author__ = "Andre Merzky, Ole Weidner"
__copyright__ = "Copyright 2012-2013, The SAGA Project"
__license__ = "MIT"
""" This is an example which shows how to access Amazon EC2 clouds via the SAGA
resource package.
In order to run this example, you need to set the following environment
variables:
* EC2_ACCESS_KEY: your Amazon EC2 ID
* EC2_SECRET_KEY: your Amazon EC2 KEY
* EC2_SSH_KEYPAIR_ID: name of ssh keypair within EC2
* EC2_SSH_KEYPAIR: your ssh keypair to use to access the VM, e.g.,
/home/username/.ssh/id_rsa_ec2
"""
import os
import sys
import time
import radical.saga as rs
# ------------------------------------------------------------------------------
#
def main():
# In order to connect to EC2, we need an EC2 ID and KEY. We read those
# from the environment.
ec2_ctx = rs.Context('EC2')
ec2_ctx.user_id = os.environ['EC2_ACCESS_KEY']
ec2_ctx.user_key = os.environ['EC2_SECRET_KEY']
# The SSH keypair we want to use the access the EC2 VM. If the keypair is
# not yet registered on EC2 saga will register it automatically. This
# context specifies the key for VM startup, ie. the VM will be configured to
# accept this key
ec2keypair_ctx = rs.Context('EC2_KEYPAIR')
ec2keypair_ctx.token = os.environ['EC2_KEYPAIR_ID']
ec2keypair_ctx.user_key = os.environ['EC2_KEYPAIR']
ec2keypair_ctx.user_id = 'root' # the user id on the target VM
# We specify the *same* ssh key for ssh access to the VM. That now should
# work if the VM go configured correctly per the 'EC2_KEYPAIR' context
# above.
ssh_ctx = rs.Context('SSH')
ssh_ctx.user_id = 'root'
ssh_ctx.user_key = os.environ['EC2_KEYPAIR']
session = rs.Session(False) # FALSE: don't use other (default) contexts
session.contexts.append(ec2_ctx)
session.contexts.append(ec2keypair_ctx)
session.contexts.append(ssh_ctx)
cr = None # compute resource handle
rid = None # compute resource ID
try:
# ----------------------------------------------------------------------
#
# reconnect to VM (ID given in ARGV[1])
#
if len(sys.argv) > 1:
rid = sys.argv[1]
# reconnect to the given resource
print('reconnecting to %s' % rid)
cr = rs.resource.Compute(id=rid, session=session)
print('reconnected to %s' % rid)
print(" state : %s (%s)" % (cr.state, cr.state_detail))
# ----------------------------------------------------------------------
#
# start a new VM
#
else:
# start a VM if needed
# in our session, connect to the EC2 resource manager
rm = rs.resource.Manager("ec2://aws.amazon.com/", session=session)
# Create a resource description with an image and an OS template,.
# We pick a small VM and a plain Ubuntu image...
cd = rs.resource.ComputeDescription()
cd.image = 'ami-0256b16b' # plain ubuntu
cd.template = 'Small Instance'
# Create a VM instance from that description.
cr = rm.acquire(cd)
rid = cr.id
print("\nWaiting for VM to become active...")
# ----------------------------------------------------------------------
#
# use the VM
#
# Wait for the VM to 'boot up', i.e., become 'ACTIVE'
cr.wait(rs.resource.ACTIVE)
# Query some information about the newly created VM
print("Created VM: %s" % cr.id)
print(" state : %s (%s)" % (cr.state, cr.state_detail))
print(" access : %s" % cr.access)
# give the VM some time to start up comlpetely, otherwise the subsequent
# job submission might end up failing...
time.sleep(60)
# create a job service which uses the VM's access URL (cr.access)
js = rs.job.Service(cr.access, session=session)
jd = rs.job.Description()
jd.executable = '/bin/sleep'
jd.arguments = ['30']
job = js.create_job(jd)
job.run()
print("\nRunning Job: %s" % job.id)
print(" state : %s" % job.state)
job.wait()
print(" state : %s" % job.state)
except rs.SagaException as ex:
# Catch all saga exceptions
print("An exception occured: (%s) %s " % (ex.type, (str(ex))))
raise
except Exception as e:
# Catch all other exceptions
print("An Exception occured: %s " % e)
raise
finally:
# ----------------------------------------------------------------------
#
# shut VM down (only when id was specified on command line)
if cr and rid:
cr.destroy()
print("\nDestroyed VM: %s" % cr.id)
print(" state : %s (%s)" % (cr.state, cr.state_detail))
# ------------------------------------------------------------------------------
#
if __name__ == "__main__":
sys.exit(main())
Resource Manager – radical.saga.resource.Manager
¶
- class radical.saga.resource.Manager(url=None, session=None, _adaptor=None, _adaptor_state={}, _ttype=None)[source]¶
Bases:
Base
,Async
In the context of RADICAL-SAGA, a ResourceManager is a service which asserts control over a set of resources. That manager can, on request, render control over subsets of those resources (resource slices) to an application.
This
Manager
class represents the contact point to such ResourceManager instances – the application can thus acquire compute, data or network resources, according to some resource specification, for a bound or unbound amount of time.- __init__(url)[source]¶
Create a new Manager instance. Connect to a remote resource management endpoint.
- Parameters:
url (
saga.Url
) – resource management endpoint
- acquire(desc)[source]¶
Create a new
saga.resource.Resource
handle for a resource specified by the description.- Parameters:
spec (
Description
or Url) – specifies the resource
Depending on the RTYPE attribute in the description, the returned resource may be a
saga.resource.Compute
,saga.resource.Storage
orsaga.resource.Network
instance.If the spec parameter is
- destroy(rid)[source]¶
Destroy / release a resource.
:type rid : string :param rid : identifies the resource to be released
- get_description(rid)[source]¶
Get the resource
Description
for the specified resource.- Parameters:
rid (str) – identifies the resource to be described.
- get_image(name)[source]¶
Get a description string for the specified image.
- Parameters:
name (str) – specifies the image name
- get_template(name)[source]¶
Get a
Description
for the specified template.- Parameters:
name (str) – specifies the name of the template
The returned resource description instance may not have all attributes filled, and may in fact not sufficiently complete to allow for successful resource acquisition. The only guaranteed attribute in the returned description is TEMPLATE, containing the very template id specified in the call parameters.
- list(rtype=None)[source]¶
List known resource instances (which can be acquired). Returns a list of IDs.
- Parameters:
rtype (None or enum (COMPUTE | STORAGE | NETWORK)) – filter for one or more resource types
Resource Description – radical.saga.resource.Description
¶
- class radical.saga.resource.Description(d=None)[source]¶
The resource description class.
Resource descriptions are used for two purposes:
an application can pass a description instances to a
saga.resource.Manager
instance, to request control over the resource slice described in the description;an application can request a resource’s description for inspection of resource properties.
There are three specific types of descriptions:
saga.resource.ComputeDescription
for the description of resources with compute capabilities;saga.resource.StorageDescription
for the description of resources with data storage capabilities;saga.resource.NetworkDescription
for the description of resources with communication capabilities.
There is at this point no notion of resources which combine different capabilities.
For all these capabilities, the following attributes are supported:
- RTypeEnum, describing the capabilities of the resource
(COMPUTE, STORAGE or NETWORK)
- TemplateString, a backend specific resource class with some
pre-defined hardware properties to apply to the resource.
- ImageString, a backend specific resource class with some
pre-defined software properties to apply to the resource.
- DynamicBoolean, if `True signifies that the resource may
dynamically change its properties at runtime
- Start`Integer (seconds) since epoch when the resource is
expected to enter / when the resource entered ACTIVE state.
- End`Integer (seconds) since epoch when the resource is
expected to enter / when the resource entered a FINAL state.
- DurationInteger, seconds for which the resource is expected to
remain / the resource remained in ACTIVE state.
- MachineOSString, for COMPUTE resources, specifies the
operating system type running on that resource.
- MachineArch : `String, for COMPUTE resources, specifies the
machine architecture of that resource.
- SizeInteger, for COMPUTE resources, specifies the
number of process slots provided, for STORAGE resource specifies the number of bytes, of the resource.
- MemoryInteger, for COMPUTE resources, specifies the
number of bytes provided as memory.
- AccessString, usually an URL, which specifies the contact
point for the resource capability interface / service interface.
Resource – radical.saga.resource.Resource
¶
- class radical.saga.resource.Resource(id=None, session=None, _adaptor=None, _adaptor_state={}, _ttype=None)[source]¶
A
Resource
class instance represents a specific slice of resource which is, if in RUNNING state, under the applications control and ready to serve usage requests. The type of accepted usage requests depends on the specific resource types (job execution forsaga.resource.Compute
, data storage forsaga.resource.Storage
, and network connectivity forsaga.resource.Network
. The exact mechanism how those usage requests are communicated are not part of the resource’s class interface, but are instead served by other RADICAL-SAGA classes – typically those aresaga.job.Service
for Compute resources, andsaga.filesystem.Directory
for Storage resources (Network resources provide implicit connectivity, but do not have explicit, public entry points to request usage.The process of resource acquisition is performed by a ResourceManager, represented by a
saga.resource.Manager
instance. The semantics of the acquisition process is defined as the act of moving a slice (subset) of the resources managed by the resource manager under the control of the requesting application (i.e. under user control), to use as needed. The type and property of the resource slice to be acquired and the time and duration over which the resource will be made available to the application are specified in asaga.resource.Description
, to be supplied when acquiring a resource.The exact backend semantics on how a resource slice is provisioned to the application is up to the resource manager backend – this can be as simple as providing a job submission endpoint to a classic HPC resource, and as complex as instantiating a pilot job or pilot data container, or reserving a network fiber on demand, or instantiating a virtual machine – the result will, from the application’s perspective, indistinguishable: a resource slice is made available for the execution of usage requests (tasks, workload, jobs, …).
Resources are stateful: when acquired from a resource manager, they are typically in NEW state, and will become ACTIVE once they are provisioned to the application and can serve usage requests. Some resources may go through an intermediate state, PENDING, when they are about to become active at some point, and usage requests can already be submitted – those usage requests will not be executed until the resources enters the ACTIVE state. The resource can be release from application control in three different ways: they can be actively be destroyed by the application, and will then enter the CANCELED state; they can internally cease to function and become unable to serve usage requests, represented by a FAILED state, and the resource manager can retract control from the application because the agreed time duration has passed – this is represented by the EXPIRED state.
- destroy()[source]¶
The semantics of this method is equivalent to the semantics of the
destroy()
call on thesaga.resource.Manager
class.
- reconfig(descr)[source]¶
A resource is acquired according to a resource description, i.e. to a specific set of attributes. At some point in time, while the resource is running, the application requirements on the resource may have changed – in that case, the application can request to change the resource’s configuration on the fly.
This method cannot be used to change the type of the resource. Backends may or may not support this operation – if not, a
saga.NotImplemented
exception is raised. If the method is supported, , then the semantics of the method is equivalent to the semantics of theacquire()
call on thesaga.resource.Manager
class.
- wait(state=FINAL, timeout=None)[source]¶
Wait for a resource to enter a specific state.
- Parameters:
state (float) – resource state to wait for (UNKNOWN, NEW, PENDING, ACTIVE, DONE, FAILED, EXPIRED, CANCELED, FINAL)
state – time to block while waiting.
This method will block until the resource entered the specified state, or until timeout seconds have passed – whichever occurs earlier. If the resource is in a final state, the call will raise and
saga.IncorrectState
exception when asked to wait for any non-final state.A negative timeout value represents an indefinit timeout.
Compute Resource – radical.saga.resource.Compute
¶
- class radical.saga.resource.Compute(id=None, session=None, _adaptor=None, _adaptor_state={}, _ttype=None)[source]¶
Bases:
Resource
A Compute resource is a resource which provides compute capabilities, i.e. which can execute compute jobs. As such, the ‘Access’ attribute of the compute resource (a URL) can be used to create a
saga.job.Service
instance to submit jobs to.
Storage Resource – radical.saga.resource.Storage
¶
- class radical.saga.resource.Storage(id=None, session=None, _adaptor=None, _adaptor_state={}, _ttype=None)[source]¶
Bases:
Resource
A Storage resource is a resource which has storage capabilities, i.e. the ability to persistently store, organize and retrieve data. As such, the ‘Access’ attribute of the storage resource (a URL) can be used to create a
saga.filesystem.Directory
instance to manage the resource’s data space.
Storage Resource – radical.saga.resource.Network
¶
Replica Management¶
The replica management module provides an interface to (distributed) data replication services, like for example iRODS.
The basic usage of the replica module is as follows:
myfile = radical.saga.replica.LogicalFile("irods://localhost/"+TEMP_FILENAME)
myfile.add_location("irods:////data/cache/AGLT2_CE_2_FTPplaceholder/whatever?resource=AGLT2_CE_2_FTP")
mydir = radical.saga.replica.LogicalDirectory("irods://localhost/" + IRODS_DIRECTORY)
mydir.make_dir("anotherdir")
Like all SAGA modules, the replica module relies on middleware adaptors
to provide bindings to a specific resource manager. Adaptors are implicitly
selected via the scheme part of the URL, e.g., local://
in the example
above selects the local replica adaptor.
Note
A list of available adaptors and supported resource managers can be found in the Developer Documentation part of this documentation.
The rest of this section is structured as follows:
Flags¶
The following constants are defined as valid flags for logical file and directory methods:
- radical.saga.filesystem.OVERWRITE¶
- radical.saga.filesystem.RECURSIVE¶
- radical.saga.filesystem.CREATE¶
- radical.saga.filesystem.CREATE_PARENTS¶
- radical.saga.filesystem.LOCK¶
- radical.saga.filesystem.EXCLUSIVE¶
- radical.saga.filesystem.DEREFERENCE¶
Logical File – radical.saga.replica.LogicalFile
¶
- class radical.saga.replica.LogicalFile(url=None, flags=512, session=None, _adaptor=None, _adaptor_state={}, _ttype=None)[source]¶
- __init__(url=None, flags=READ, session=None)[source]¶
url: saga.Url flags: flags enum session: saga.Session ret: obj
- add_location(name)[source]¶
Add a physical location.
name: saga.Url ttype: saga.task.type enum ret: None / saga.Task
- download(name, src=None, flags=None)[source]¶
Download a physical file.
name: saga.Url src: saga.Url flags: flags enum ttype: saga.task.type enum ret: None / saga.Task
- get_size()[source]¶
Return the size of the file.
ttype: saga.task.type enum ret: int / saga.Task
Returns the size of the physical file represented by this logical file (in bytes)
Example:
# get a file handle lf = saga.replica.LogicalFile("irods://localhost/tmp/data.bin") # print the logical file's size print(lf.get_size ())
- list_locations()[source]¶
List all physical locations of a logical file.
ttype: saga.task.type enum ret: list [saga.Url] / saga.Task
- remove_location(name)[source]¶
Remove a physical location.
name: saga.Url ttype: saga.task.type enum ret: None / saga.Task
- replicate(name)[source]¶
Replicate a logical file.
name: saga.Url flags: flags enum ttype: saga.task.type enum ret: None / saga.Task
Logical Directory – radical.saga.replica.LogicalDirectory
¶
- class radical.saga.replica.LogicalDirectory(url=None, flags=512, session=None, _adaptor=None, _adaptor_state={}, _ttype=None)[source]¶
- __init__(url, flags=READ, session=None)[source]¶
Create a new Logical Directory instance.
url: saga.Url flags: flags enum session: saga.Session ret: obj
- find(name_pattern, attr_pattern=None, flags=RECURSIVE)[source]¶
name_pattern: string attr_pattern: string flags: flags enum ttype: saga.task.type enum ret: list [saga.Url] / saga.Task
- get_size(tgt)[source]¶
tgt: logical file to get size for ttype: saga.task.type enum ret: int / saga.Task
Returns the size of the physical file represented by the given logical file (in bytes)
Example:
# get a logical directory handle lf = saga.replica.LogicalFile("irods://localhost/tmp/data/") # print a logical file's size print(lf.get_size ('data.dat'))
- open(tgt, flags=READ)[source]¶
tgt: saga.Url flags: saga.namespace.flags enum ttype: saga.task.type enum ret: saga.namespace.Entry / saga.Task
- open_dir(tgt, flags=READ)[source]¶
- Parameters:
tgt – name/path of the directory to open
flags – directory creation flags
ttype: saga.task.type enum ret: saga.namespace.Directory / saga.Task
Open and return a new directoy
The call opens and returns a directory at the given location.
Example:
# create a subdir 'data' in /tmp dir = saga.namespace.Directory("sftp://localhost/tmp/") data = dir.open_dir ('data/', saga.namespace.Create)
Exception Handling¶
Exceptions – radical.saga.exceptions
¶
- exception radical.saga.exceptions.AlreadyExists(msg, parent=None, api_object=None, from_log=False)[source]¶
Bases:
SagaException
,ValueError
The entity to be created already exists. (rank: 8)
- exception radical.saga.exceptions.AuthenticationFailed(msg, parent=None, api_object=None, from_log=False)[source]¶
Bases:
SagaException
,RuntimeError
The backend could not establish a valid identity. (rank: 3)
- exception radical.saga.exceptions.AuthorizationFailed(msg, parent=None, api_object=None, from_log=False)[source]¶
Bases:
SagaException
,RuntimeError
The backend could not establish a valid identity. (rank: 4)
- exception radical.saga.exceptions.BadParameter(msg, parent=None, api_object=None, from_log=False)[source]¶
Bases:
SagaException
,ValueError
A given parameter is out of bound or ill formatted. (rank: 9)
- exception radical.saga.exceptions.DoesNotExist(msg, parent=None, api_object=None, from_log=False)[source]¶
Bases:
SagaException
,KeyError
An operation tried to access a non-existing entity. (rank: 7)
- exception radical.saga.exceptions.IncorrectState(msg, parent=None, api_object=None, from_log=False)[source]¶
Bases:
SagaException
,RuntimeError
The operation is not allowed on the entity in its current state. (rank: 6)
- exception radical.saga.exceptions.IncorrectURL(msg, parent=None, api_object=None, from_log=False)[source]¶
Bases:
SagaException
,ValueError
The given URL could not be interpreted, for example due to an incorrect / unknown schema. (rank: 10)
- exception radical.saga.exceptions.NoSuccess(msg, parent=None, api_object=None, from_log=False)[source]¶
Bases:
SagaException
,RuntimeError
Some other error occurred. (rank: 1)
- exception radical.saga.exceptions.NotImplemented(msg, parent=None, api_object=None, from_log=False)[source]¶
Bases:
SagaException
,NotImplementedError
radical.saga does not implement this method or class. (rank: 11)
- exception radical.saga.exceptions.PermissionDenied(msg, parent=None, api_object=None, from_log=False)[source]¶
Bases:
SagaException
,RuntimeError
The used identity is not permitted to perform the requested operation. (rank: 5)
- exception radical.saga.exceptions.SagaException(msg, parent=None, api_object=None, from_log=False)[source]¶
Bases:
Exception
The Exception class encapsulates information about error conditions encountered in SAGA.
Additionally to the error message (e.message), the exception also provides a trace to the code location where the error condition got raised (e.traceback).
B{Example}:
try : file = saga.filesystem.File ("sftp://comet.xsede.org/tmp/data1.dat") except saga.Timeout as to : # maybe the network is down? print("connection timed out") except saga.Exception as e : # something else went wrong print("Exception occurred: %s %s" % (e, e.traceback))
There are cases where multiple backends can report errors at the same time. In that case, the radical.saga implementation will collect the exceptions, sort them by their ‘rank’, and return the highest ranked one. All other catched exceptions are available via
get_all_exceptions()
, or via the exceptions property.The rank of an exception defines its explicity: in general terms: the higher the rank, the better defined / known is the cause of the problem.
- get_message()[source]¶
Return the exception message as a string. That message is also available via the ‘message’ property.
- get_object()[source]¶
Return the object that raised this exception. An object may not always be available – for example, exceptions raised during object creation may not have the option to keep an incomplete object instance around. In those cases, this method will return ‘None’. Either way, the object is also accessible via the ‘object’ property.
- property message¶
Return the exception message as a string. That message is also available via the ‘message’ property.
- property object¶
Return the object that raised this exception. An object may not always be available – for example, exceptions raised during object creation may not have the option to keep an incomplete object instance around. In those cases, this method will return ‘None’. Either way, the object is also accessible via the ‘object’ property.
- property type¶
Return the type of the exception as string.
- exception radical.saga.exceptions.Timeout(msg, parent=None, api_object=None, from_log=False)[source]¶
Bases:
SagaException
,RuntimeError
The interaction with the backend times out. (rank: 2)
Security Contexts¶
Context Class – radical.saga.context
¶
- class radical.saga.context.Context(ctype, _adaptor=None, _adaptor_state={})[source]¶
Bases:
Base
,Attributes
A SAGA Context object as defined in GFD.90.
A security context is a description of a security token. It is important to understand that, in general, a context really just describes a token, but that a context is not a token (*). For example, a context may point to a X509 certificate – but it will in general not hold the certificate contents.
Context classes are used to inform the backends used by SAGA on what security tokens are expected to be used. By default, SAGA will be able to pick up such tokens from their default location, but in some cases it might be necessary to explicitly point to them - then use Session with context instances to do so.
The usage example for contexts is below:
# define an ssh context ctx = saga.Context("SSH") ctx.user_cert = '$HOME/.ssh/special_id_rsa' ctx.user_key = '$HOME/.ssh/special_id_rsa.pub' # add the context to a session session = saga.Session() session.add_context(ctx) # create a job service in this session -- that job service can now # *only* use that ssh context. j = saga.job.Service('ssh://remote.host.net/', session=session)
The Session argument to the job.Service constructor is fully optional – if left out, SAGA will use default session, which picks up some default contexts as described above – that will suffice for the majority of use cases.
(*) The only exception to this rule is the ‘UserPass’ key, which is used to hold plain-text passwords. Use this key with care – it is not good practice to hard-code passwords in the code base, or in config files. Also, be aware that the password may show up in log files, when debugging or analyzing your application.
UserPass Context¶
This context stores a user id and password, to be used for backend connections. This context can be used for SSH connections if it is preferred over public-/private-key authentication.
The following context attributes are supported:
- radical.saga.context.Contex("UserPass")¶
The type for this context has to be set to “UserPass” in the constructor, i.e.,
radical.saga.Context("ssh")
.
- radical.saga.context.user_id¶
The username on the target resource.
- radical.saga.context.user_pass¶
The pass-phrase to use.
Warning
NEVER put plain-text passwords into your source file. It is a huge security risk! Reading passwords from the command line, and environment variable or a configuration file instead would be a much better option.
Example:
ctx = radical.saga.Context("UserPass")
ctx.user_id = "johndoe"
ctx.user_pass = os.environ['MY_USER_PASS']
session = radical.saga.Session()
session.add_context(ctx)
js = radical.saga.job.Service("ssh://machine_y.futuregrid.org",
session=session)
SSH Context¶
This SSH :context points to a ssh public/private key-pair and user id to
be used for any ssh-based backend connections, e.g., ssh://
, pbs+ssh://
and so on.
The following context attributes are supported:
- radical.saga.context.Contex("SSH")¶
The type for this context has to be set to “SSH” in the constructor, i.e.,
radical.saga.Context("SSH")
.
- radical.saga.context.user_id¶
The username on the target resource.
- radical.saga.context.user_key¶
The public ssh key file to use for the connection. This attribute is useful if an SSH key-pair other than the default one (in $HOME/.ssh/) is required to establish a connection.
- radical.saga.context.user_pass¶
The pass-phrase to use to decrypt a password-protected key.
Warning
NEVER put plain-text passwords into your source file. It is a huge security risk! Reading passwords from the command line, and environment variable or a configuration file instead would be a much better option.
Example:
ctx = radical.saga.Context("SSH")
ctx.user_id = "johndoe"
ctx.user_key = "/home/johndoe/.ssh/key_for_machine_x"
ctx.user_pass = "XXXX" # password to decrypt 'user_key' (if required)
session = radical.saga.Session()
session.add_context(ctx)
js = radical.saga.job.Service("ssh://machine_x.futuregrid.org",
session=session)
X.509 Context¶
The X.509 context points to an existing, local X509 proxy.
The following context attributes are supported:
- radical.saga.context.Contex("X509")¶
The type for this context has to be set to “X509” in the constructor, i.e.,
radical.saga.Context("X509")
.
- radical.saga.context.user_proxy¶
The X509 user proxy file to use for the connection. This attribute is useful if a proxy file other than the default one (in /tmp/x509_u<uid>) is required to establish a connection.
Example:
ctx = radical.saga.Context("X509")
ctx.user_proxy = "/tmp/x509_u123_for_machine_y"
session = radical.saga.Session()
session.add_context(ctx)
js = radical.saga.job.Service("gsissh://machine_y.futuregrid.org",
session=session)
MyProxy Context¶
The MyProxy context fetches a delegated X.509 proxy from a (Globus) myproxy server.
The following context attributes are supported:
- radical.saga.context.Contex("MyProxy")¶
The type for this context has to be set to “MyProxy” in the constructor, i.e.,
radical.saga.Context("MyProxy")
.
- radical.saga.context.server¶
The hostname of the myproxy server. This is equivalent to
myproxy-logon --pshost
.
- radical.saga.context.user_id¶
The username for the delegated proxy. This is equivalent to
myproxy-logon --username
.
- radical.saga.context.life_time¶
The lifetime of the delegated proxy. This is equivalent to
myproxy-logon --proxy_lifetime
(default is 12h).
- radical.saga.context.user_pass¶
The password for the delegated proxy.
Warning
NEVER put plain-text passwords into your source file. It is a huge security risk! Reading passwords from the command line, and environment variable or a configuration file instead would be a much better option.
Example:
c = radical.saga.Context("MyProxy")
c.server = "myproxy.teragrid.org"
c.user_id = "johndoe"
c.user_pass = os.environ['MY_USER_PASS']
session = radical.saga.Session()
session.add_context(ctx)
js = radical.saga.job.Service("pbs+gsissh://gsissh.kraken.nics.xsede.org",
session=session)
EC2 Context¶
The EC2 context can be used to authenticate against the Amazon EC2 service.
Note
EC2 Contexts are usually used in conjunction with an EC2_KEYPAIR
and an SSH Context
as shown in the example below.
The following context attributes are supported:
- radical.saga.context.Contex("MyProxy")¶
The type for this context has to be set to “EC2” in the constructor, i.e.,
radical.saga.Context("EC2")
.
- radical.saga.context.user_id¶
The Amazon EC2 ID. See the Amazon Web-Services website for more details.
- radical.saga.context.user_key¶
The Amazon EC2 key. See the Amazon Web-Services website for more details.
Example:
ec2_ctx = radical.saga.Context('EC2')
ec2_ctx.user_id = 'XXXXXXXXXXYYYYYYYYZ'
ec2_ctx.user_key = 'WwwwwwXxxxxxxxxxYyyyyyyyyZzzzzzz'
# The SSH key-pair we want to use the access the EC2 VM. If the keypair is
# not yet registered on EC2 saga will register it automatically.
ec2keypair_ctx = radical.saga.Context('EC2_KEYPAIR')
ec2keypair_ctx.token = 'KeyName'
ec2keypair_ctx.user_key = '$HOME/.ssh/ec2_key'
ec2keypair_ctx.user_id = 'root' # the user id on the target VM
# The same SSH key-pair as above, but this one will be picked up by the SSH
# adaptor. While this is somewhat redundant, it is still necessary because
# of current limitations imposed by 'liblcoud', the library which implements
# the radical.saga EC2 adaptor.
ssh_ctx = radical.saga.Context('SSH')
ssh_ctx.user_id = 'root'
ssh_ctx.user_key = '$HOME/.ssh/ec2_key'
session = radical.saga.Session(False) # FALSE: don't use other (default) contexts
session.contexts.append(ec2_ctx)
session.contexts.append(ec2keypair_ctx)
session.contexts.append(ssh_ctx)
EC2_KEYPAIR Context¶
This context refers to an SSH key-pair and is very similar to the SSH Context
described above. It is used to inject a key-pair into an Amazon EC2 VM and
is used injunction with an EC2 Context
. See above for an example.
The following context attributes are supported:
- radical.saga.context.Contex("EC2_KEYPAIR")¶
The type for this context has to be set to “EC2_KEYPAIR” in the constructor, i.e.,
radical.saga.Context("EC2_KEYPAIR")
.
- radical.saga.context.user_id¶
The username on the target resource.
- radical.saga.context.user_key¶
The public ssh key file to use for the connection. This attribute is useful if an SSH key-pair other than the default one (in $HOME/.ssh/) is required to establish a connection.
- radical.saga.context.user_pass¶
The pass-phrase to use to decrypt a password-protected key.
- radical.saga.context.token¶
The Amazon EC2 identifier for this key-pair.
Sessions¶
Session – radical.saga.session
¶
- class radical.saga.session.Session(default=True, uid=None)[source]¶
Bases:
SimpleBase
A SAGA Session object as defined in GFD.90.
A SAGA session has the purpose of scoping the use of security credentials for remote operations. In other words, a session instance acts as a container for security Context instances – SAGA objects (such as job.Service or filesystem.File) created in that session will then use exactly the security contexts from that session (and no others).
That way, the session serves two purposes: (1) it helps SAGA to decide which security mechanism should be used for what interaction, and (2) it helps SAGA to find security credentials which would be difficult to pick up automatically.
The use of a session is as follows:
Example:
# define an ssh context c = saga.Context('ssh') c.user_cert = '$HOME/.ssh/special_id_rsa.pub' c.user_key = '$HOME/.ssh/special_id_rsa' # add it to a session s = saga.Session s.add_context(c) # create a job service in this session -- that job service can now # *only* use that ssh context. j = saga.job.Service('ssh://remote.host.net/', s)
The session argument to the L{job.Service} constructor is fully optional – if left out, SAGA will use default session, which picks up some default contexts as described above – that will suffice for the majority of use cases.
A session instance exposes a context property, which is a list of authentication contexts managed by this session. As the contexts and the session are stateless, it is safe to modify this list as needed.
- add_context(ctx)[source]¶
ctx: saga.Context ret: None
Add a security L{Context} to the session. It is encouraged to use the L{contexts} property instead.
Attributes and Callbacks¶
Callbacks – radical.saga.Callback
¶
- class radical.saga.attributes.Callback[source]¶
Callback base class.
All objects using the Attribute Interface allow to register a callback for any changes of its attributes, such as ‘state’ and ‘state_detail’. Those callbacks can be python call’ables, or derivates of this callback base class. Instances which inherit this base class MUST implement (overload) the cb() method.
The callable, or the callback’s cb() method is what is invoked whenever the SAGA implementation is notified of an change on the monitored object’s attribute.
The cb instance receives three parameters upon invocation:
obj: the watched object instance
key: the watched attribute (e.g. ‘state’ or ‘state_detail’)
val: the new value of the watched attribute
If the callback returns ‘True’, it will remain registered after invocation, to monitor the attribute for the next subsequent state change. On returning ‘False’ (or nothing), the callback will not be called again.
To register a callback on a object instance, use:
class MyCallback (saga.Callback): def __init__ (self): pass def cb (self, obj, key, val) : print(" %s\n %s (%s) : %s" % self._msg, obj, key, val) jd = saga.job.Description () jd.executable = "/bin/date" js = saga.job.Service ("fork://localhost/") job = js.create_job(jd) cb = MyCallback() job.add_callback(saga.STATE, cb) job.run()
See documentation of the
saga.Attribute
interface for further details and examples.- cb(obj, key, val)[source]¶
This is the method that needs to be implemented by the application
Keyword arguments:
obj: the watched object instance key: the watched attribute val: the new value of the watched attribute
Return:
keep: bool, signals to keep (True) or remove (False) the callback after invocation
Callback invocation MAY (and in general will) happen in a separate thread – so the application need to make sure that the callback code is thread-safe.
The boolean return value is used to signal if the callback should continue to listen for events (return True) , or if it rather should get unregistered after this invocation (return False).
Attribute – radical.saga.Attributes
¶
- class radical.saga.attributes.Attributes(*args, **kwargs)[source]¶
Attribute Interface Class
The Attributes interface implements the attribute semantics of the SAGA Core API specification (http://ogf.org/documents/GFD.90.pdf). Additionally, this implementation provides that semantics the python property interface. Note that a single set of attributes is internally managed, no matter what interface is used for access.
A class which uses this interface can internally specify which attributes can be set, and what type they have. Also, default values can be specified, and the class provides a rudimentary support for converting scalar attributes into vector attributes and back.
Also, the consumer of this API can register callbacks, which get triggered on changes to specific attribute values.
Example use case:
# -------------------------------------------------------------------------------- class Transliterator ( saga.Attributes ) : def __init__ (self, *args, **kwargs) : # setting attribs to non-extensible will cause the cal to init below to # complain if attributes are specified. Default is extensible. # self._attributes_extensible (False) # pass args to base class init (requires 'extensible') super (Transliterator, self).__init__ (*args, **kwargs) # setup class attribs self._attributes_register ('apple', 'Appel', URL, SCALAR, WRITEABLE) self._attributes_register ('plum', 'Pruim', STRING, SCALAR, READONLY) # setting attribs to non-extensible at *this* point will have allowed # custom user attribs on __init__ time (via args), but will then forbid # any additional custom attributes. # self._attributes_extensible (False) # -------------------------------------------------------------------------------- if __name__ == "__main__": # define a callback method. This callback can get registered for # attribute changes later. # ---------------------------------------------------------------------------- def cb (key, val, obj) : # the callback gets information about what attribute was changed # on what object: print("called: %s - %s - %s" % (key, str(val), type (obj))) # returning True will keep the callback registered for further # attribute changes. return True # ---------------------------------------------------------------------------- # create a class instance and add a 'cherry' attribute/value on # creation. trans = Transliterator (cherry='Kersche') # use the property interface to mess with the pre-defined # 'apple' attribute print("\n -- apple") print(trans.apple) trans.apple = 'Abbel' print(trans.apple) # add our callback to the apple attribute, and trigger some changes. # Note that the callback is also triggered when the attribute's # value changes w/o user control, e.g. by some internal state # changes. trans.add_callback ('apple', cb) trans.apple = 'Apfel' # Setting an attribute final is actually an internal method, used by # the implementation to signal that no further changes on that # attribute are expected. We use that here for demonstrating the # concept though. Callback is invoked on set_final(). trans._attributes_set_final ('apple') trans.apple = 'Abbel' print(trans.apple) # mess around with the 'plum' attribute, which was marked as # ReadOnly on registration time. print("\n -- plum") print(trans.plum) # trans.plum = 'Pflaume' # raises readonly exception # trans['plum'] = 'Pflaume' # raises readonly exception print(trans.plum) # check if the 'cherry' attribute exists, which got created on # instantiation time. print("\n -- cherry") print(trans.cherry) # as we have 'extensible' set, we can add a attribute on the fly, # via either the property interface, or via the GFD.90 API of # course. print("\n -- peach") print(trans.peach) trans.peach = 'Birne' print(trans.peach)
This example will result in:
-- apple Appel Appel Abbel called: apple - Abbel Appel - <class '__main__.Transliterator'> called: apple - Apfel - <class '__main__.Transliterator'> called: apple - Apfel - <class '__main__.Transliterator'> Apfel -- plum Pruim Pruim -- cherry Kersche -- peach Berne Birne
- add_callback(key, cb)[source]¶
For any attribute change, the API will check if any callbacks are registered for that attribute. If so, those callbacks will be called in order of registration. This registration function will return an id (cookie) identifying the callback – that id can be used to remove the callback.
A callback is any callable python construct, and MUST accept three arguments:
- STRING key: the name of the attribute which changed - ANY val: the new value of the attribute - ANY obj: the object on which this attribute interface was called
The ‘obj’ can be any python object type, but is guaranteed to expose this attribute interface.
The callback SHOULD return ‘True’ or ‘False’ – on ‘True’, the callback will remain registered, and will thus be called again on the next attribute change. On returning ‘False’, the callback will be unregistered, and will thus not be called again. Returning nothing is interpreted as ‘False’, other return values lead to undefined behavior.
Note that callbacks will not be called on ‘Final’ attributes (they will be called once as that attribute enters finality).
- attribute_exists(key, _flow='_down')[source]¶
attribute_exist (key)
This method will check if the given key is known and was set explicitly. The call will also return ‘True’ if the value for that key is ‘None’.
- attribute_is_readonly(key)[source]¶
This method will check if the given key is readonly, i.e. cannot be ‘set’. The call will also return ‘True’ if the attribute is final
- attribute_is_removable(key, _flow='_down')[source]¶
attribute_is_writeable (key)
This method will check if the given key can be removed.
- attribute_is_vector(key)[source]¶
This method will check if the given attribute has a vector value type.
- attribute_is_writeable(key)[source]¶
This method will check if the given key is writeable - i.e. not readonly.
- find_attributes(pattern)[source]¶
Similar to list(), but also grep for a given attribute pattern. That pattern is of the form ‘key=val’, where both ‘key’ and ‘val’ can contain POSIX shell wildcards. For non-string typed attributes, the pattern is applied to a string serialization of the typed value, if that exists.
- get_attribute(key)[source]¶
This method returns the value of the specified attribute. If that attribute does not exist, an DoesNotExist is raised. It is not an error to query an existing, but unset attribute though – that will result in ‘None’ to be returned (or the default value, if available).
- get_vector_attribute(key)[source]¶
See also:
saga.Attributes.get_attribute()
(key).As python can handle scalar and vector types transparently, this method is in fact not very useful. For that reason, it maps internally to the get_attribute method.
- remove_attribute(key)[source]¶
Removing an attribute is actually different from unsetting it, or from setting it to ‘None’. On remove, all traces of the attribute are purged, and the key will not be listed on
saga.Attributes.list_attributes()
() anymore.
- remove_callback(key, id)[source]¶
This method allows to unregister a previously registered callback, by providing its id. It is not an error to remove a non-existing cb, but a valid ID MUST be provided – otherwise, a BadParameter is raised.
If no ID is provided (id == None), all callbacks are removed for this attribute.
- set_attribute(key, val)[source]¶
This method sets the value of the specified attribute. If that attribute does not exist, DoesNotExist is raised – unless the attribute set is marked ‘extensible’ or ‘private’. In that case, the attribute is created and set on the fly (defaulting to mode=writeable, flavor=Scalar, type=ANY, default=None). A value of ‘None’ may reset the attribute to its default value, if such one exists (see documentation).
Note that this method is performing a number of checks and conversions, to match the value type to the attribute properties (type, mode, flavor). Those conversions are not guaranteed to yield the expected result – for example, the conversion from ‘scalar’ to ‘vector’ is, for complex types, ambiguous at best, and somewhat stupid. The consumer of the API SHOULD ensure correct attribute values. The conversions are intended to support the most trivial and simple use cases (int to string etc). Failed conversions will result in an BadParameter exception.
Attempts to set a ‘final’ attribute are silently ignored. Attempts to set a ‘readonly’ attribute will result in an IncorrectState exception being raised.
Note that set_attribute() will trigger callbacks, if a new value (different from the old value) is given.
Developer Documentation¶
Writing RADICAL-SAGA Adaptors¶
Note
This part of the RADICAL-SAGA documentation is not for users of RADICAL-SAGA, but rather for implementors of backend adaptors (although it may be beneficial for users to skim over section Adaptor Binding to gain some insight into RADICAL-SAGA’s mode of operation).
Adaptor Structure¶
A RADICAL-SAGA adaptor is a Python module with well defined structure. The
module must expose a class Adaptor
, which (a) must be a singleton, (b) must
provide a sanity_check()
method, and (c) must inherit from
radical.saga.cpi.AdaptorBase
. That base class’ constructor (__init__
)
must be called like this:
class Adaptor (rs.cpi.AdaptorBase):
__metaclass__ = Singleton
def __init__ (self) :
rs.cpi.AdaptorBase.__init__ (self, _ADAPTOR_INFO, _ADAPTOR_OPTIONS)
def sanity_check (self) :
# FIXME: detect gsissh tool
pass
_ADAPTOR_INFO
and _ADAPTOR_OPTIONS
are module level Python dict
s with the
following layout:
_ADAPTOR_NAME = 'rs.adaptor.gsissh.job'
_ADAPTOR_SCHEMAS = ['ssh', 'gsissh']
_ADAPTOR_OPTIONS = [
{
'category' : _ADAPTOR_NAME,
'name' : 'cache_connection',
'type' : bool,
'default' : True,
'valid_options' : [True, False],
'documentation' : 'toggle connection caching',
'env_variable' : None
},
]
_ADAPTOR_INFO = {
'name' : _ADAPTOR_NAME,
'version' : 'v0.1',
'cpis' : [
{
'type' : 'rs.job.Service',
'class' : 'GSISSHJobService',
'schemas' : _ADAPTOR_SCHEMAS
},
{
'type' : 'rs.job.Job',
'class' : 'GSISSHJob',
'schemas' : _ADAPTOR_SCHEMAS
}
]
}
(It is beneficial to specify _ADAPTOR_NAME
and _ADAPTOR_SCHEMAS
separately, as they are used in multiple places, as explained later on.)
The adaptor classes listed in the _ADAPTOR_INFO
(in this example,
GSISSHJob
and GSISSHJobService
) are the classes which are actually bound
to a SAGA API object, and provide its functionality. For that purpose, those
classes must inherit the respective object’s Capability Provider Interface
(CPI), as shown in the stub below:
class GSISSHJobService (rs.cpi.job.Service) :
def __init__ (self, api, adaptor) :
rs.cpi.Base.__init__ (self, api, adaptor, 'GSISSHJobService')
@SYNC
def init_instance (self, rm_url, session) :
self._rm = rm_url
self._session = session
The radical.saga.cpi.Base
class will make sure that the adaptor classes keep
a self._adaptor
member, pointing to the adaptor singleton instance (i.e. the
module’s Adaptor
class instance). It will further initialize a logging module
(available as self._logger
).
Note that the adaptor class' __init__
does not correspond to the API level
object __init__
– instead, the adaptor class construction is a two step
process, and the actual constructor semantics is mapped to an
init_instance()
method, which receives the API level constructor arguments.
Adaptor Registration¶
Any SAGA adaptor must be registered in the :ref:Engine
in order to be
usable. That process is very simple, and performed by the
radical.saga.cpi.AdaptorBase
class – so all the adaptor has to take care
of is the correct initialization of that base class, as described in
Adaptor Structure. The AdaptorBase
will forward the
_ADAPTOR_INFO
to the radical.saga.engine.Engine
class, where the adaptor
will be added to a registry of adaptor classes, hierarchically sorted like
this (simplified):
Engine._adaptor_registry =
{
'rs.job.Service' :
{
'gshiss' : [rs.adaptors.gsissh.job.GSISSHJobService, ...]
'ssh' : [rs.adaptors.gsissh.job.GSISSHJobService, ...]
'gram' : [rs.adaptors.globus.job.GRAMJobService, ...]
...
},
'rs.job.Job' :
{
'gshiss' : [rs.adaptors.gsissh.job.GSISSHJob, ...]
'ssh' : [rs.adaptors.gsissh.job.GSISSHJob, ...]
'gram' : [rs.adaptors.globus.job.GRAMJob, ...]
...
},
...
}
That registry is searched when the engine binds an adaptor class instance to a SAGA API object instance – see Adaptor Binding.
Adaptor Binding¶
Whenever a SAGA API object is created, or whenever any method is invoked on that object, the RADICAL-SAGA implementation needs to (a) select a suitable backend adaptor to perform that operation, and (b) invoke the respective adaptor functionality.
The first part, selecting a specific adaptor for a specific API object instance, is called binding – RADICAL-SAGA binds an adaptor to an object exactly once, at creation time, and the bond remains valid for the lifetime of the API object: on API creation, the API object will request a suitable adaptor from the engine, and will keep it for further method invocations (code simplified):
class Service (object) :
def __init__ (self, url=None, session=None) :
self._engine = getEngine ()
self._adaptor = self._engine.get_adaptor (self, 'rs.job.Service', url.scheme, ...,
url, session)
def run_job (self, cmd, host="", ttype=None) :
return self._adaptor.run_job (cmd, host, ttype=ttype)
...
The Engine.get_adaptor
call will iterate through the engine’s adaptor
registry, and will, for all adaptors which indicated support for the given URL
scheme, request an adaptor class instance for the given API class. If an
adaptor class instance can successfully be created, the engine will further
attempt to call the adaptor class’ init_instance
method, which will in fact
construct an adaptor level representation of the API level object:
class GSISSHJobService (rs.cpi.job.Service) :
def __init__ (self, api, adaptor) :
rs.cpi.Base.__init__ (self, api, adaptor, 'GSISSHJobService')
def init_instance (self, url, session) :
# - check if session contains suitable security tokens for (gsi)ssh
# - check if endpoint given by 'url' can be served
# - establish and cache connection to that endpoint, with the sesssion
# credentials
...
If either the adaptor class instantiation or the init_instance
invocation
raise an exception, the engine will try the next registered adaptor for that API
class / url scheme combo. If both steps are successful, the adaptor class
instance will be returned to the API object’s constructor, as shown above.
Adaptor State¶
Instances of adaptor classes will often need to share some state. For example,
different instances of rs.job.Job
running via ssh on a specific host may
want to share a single ssh connection; asynchronous operations on a specific
adaptor may want to share a thread pool; adaptor class instances of a specific
resource adaptor may want to share a notification endpoint. State sharing
supports scalability, and can simplify adaptor code – but also adds some
overhead to exchange and synchronize state between those adaptor class
instances.
The preferred way to share state is to use the adaptor instance (as it was created by the engine while loading the adaptor’s module) for state exchange (see section Adaptor Registration – all adaptor class instances get the spawning adaptor instance passed at creation time:
class GSISSHJobService (rs.cpi.job.Service) :
def __init__ (self, api, adaptor) :
rs.cpi.Base.__init__ (self, api, adaptor, 'GSISSHJobService')
radical.saga.cpi.Base
will make that instance available as self._adaptor
.
As that adaptor class is part of the adaptor modules code base, and thus under
full control of the adaptor developer, it is straight forward to use it for
state caching and state exchange. Based on the example code in section
Adaptor Structure, a connection caching adaptor class could look like
this:
class Adaptor (rs.cpi.base.AdaptorBase):
__metaclass__ = Singleton
def __init__ (self) :
rs.cpi.AdaptorBase.__init__ (self, _ADAPTOR_INFO, _ADAPTOR_OPTIONS)
self._cache = {}
...
...
class GSISSHJobService (rs.cpi.job.Service) :
def __init__ (self, api, adaptor) :
...
rs.cpi.Base.__init__ (self, api, adaptor, 'GSISSHJobService')
self._cache = self._adaptor._cache
@SYNC
def init_instance (self, rm_url, session) :
...
if not self._rm in self._adaptor.keys () :
self._cache [self._rm] = setup_connection (self._rm)
@SYNC
def run_job (self, cmd) :
...
connection = self._cache [self._rm]
return connection.run (cmd)
...
The adaptor implementor is responsible for the consistency of the shared state,
and may need to use locking to ensure proper consistency in multithreaded
environments – the self._adaptor
class merely provides a shared container
for the data, nothing else. Also, the Adaptor class' destructor should take
care of freeing the cached / shared state objects (unless another cleanup
mechanism is in place).
Creating API Objects¶
Several SAGA API objects feature factory-like methods – examples are
Directory.open()
, job.Service.create_job()/run_job()
, and
resource.Manager.aquire()
. To correctly implement those methods on adaptor
level, adaptors need to be able to instantiate the API objects to return. We
have seen in section Adaptor Binding that, on API object creation, the
Engine
will select and bind a suitable adaptor to the object instance. In
many cases, however, an implementation of a factory-like API method will want to
make sure that the resulting API object is bound to the same adaptor instance as
the spawning adaptor class instance itself. For that purpose, all API object
constructors will accept two additional parameters: _adaptor
(type:
radical.saga.cpi.Base
or derivative), and _adaptor_state
(type:
dict
). This is also provided for API objects which normally have no
public constructor at all:
class Job (rs.attributes.Attributes, rs.task.Async) :
def __init__ (self, _adaptor=None, _adaptor_state={}) :
if not _adaptor :
raise rs.exceptions.IncorrectState ("rs.job.Job constructor is private")
...
# bind to the given adaptor -- this will create the required adaptor
# class. We need to specify a schema for adaptor selection -- and
# simply choose the first one the adaptor offers.
engine = getEngine ()
adaptor_schema = _adaptor.get_schemas()[0]
self._adaptor = engine.bind_adaptor (self, 'rs.job.Job', adaptor_schema,
rs.task.NOTASK, _adaptor, _adaptor_state)
As shown above, _adaptor
and _adaptor_state
are forwarded to the
Engine's bind_adaptor()
method, and if present will ensure that the
resulting API object is bound to the given adaptor. The _adaptor_state
dict
will be forwarded to the adaptor class level init_instance()
call, and can
be used to correctly initialize the state of the new adaptor class. An example
of adaptor level code for creating an radical.saga.job.Job
instance via
radical.saga.job.Service
.create_job()
is below:
class GSISSHJobService (rs.cpi.job.Service) :
def __init__ (self, api, adaptor) :
rs.cpi.Base.__init__ (self, api, adaptor, 'GSISSHJobService')
@SYNC
def init_instance (self, rm_url, session) :
...
@SYNC
def create_job (self, jd) :
state = { 'job_service' : self,
'job_description' : jd,
'session' : self._session}
return rs.job.Job (_adaptor=self._adaptor, _adaptor_state=state)
class GSISSHJob (rs.cpi.job.Job) :
def __init__ (self, api, adaptor) :
rs.cpi.Base.__init__ (self, api, adaptor, 'GSISSHJob')
...
@SYNC
def init_instance (self, job_info):
self._session = job_info['session']
self._jd = job_info['job_description']
self._parent_service = job_info['job_service']
self._id = None # is assigned when calling job.run()
self._state = rs.job.NEW
# register ourselves with the parent service
self._parent_service._update_jobid (self, self._id)
...
Exception Handling¶
RADICAL-SAGA defines a set of exceptions which can be thrown on the various
method invocations (see section api_exceptions. Adaptor implementors
must ensure, that the correct exception types are thrown on the corresponding
error conditions. If the API layer encounters a non-SAGA exception from the
adaptor layer, it will convert it to a rs.NoSuccess
exception. While that
will reliably shield the user layer from non-SAGA exception types, it is a lossy
translation, and likely to hide the underlying cause of the error. This feature
is thus to be considered as a safe guard, not as a preferred method of error
state communication!
An example of adaptor level error handling is below:
class ContextX509 (rs.cpi.Context) :
def __init__ (self, api, adaptor) :
rs.cpi.Base.__init__ (self, api, adaptor, 'ContextX509')
@SYNC
def init_instance (self, type) :
if not type.lower () in (schema.lower() for schema in _ADAPTOR_SCHEMAS) :
raise rs.exceptions.BadParameter \
("the x509 context adaptor only handles x509 contexts - duh!")
@SYNC
def _initialize (self, session) :
if not self._api.user_proxy :
self._api.user_proxy = "x509up_u%d" % os.getuid() # fallback
if not os.path.exists (self._api.user_proxy) or \
not os.path.isfile (self._api.user_proxy) :
raise rs.exceptions.BadParameter ("X509 proxy does not exist: %s"
% self._api.user_proxy)
try :
fh = open (self._api.user_proxy)
except Exception as e:
raise rs.exceptions.PermissionDenied ("X509 proxy '%s' not readable: %s"
% (self._api.user_proxy, str(e)))
else :
fh.close ()
Asynchronous Methods¶
The SAGA API features several objects which implement both synchronous and
asynchronous versions of their respective methods. Synchronous calls will
return normal objects or values; asynchronous calls will return
radical.saga.Task
instances, which represent the ongoing asynchronous method,
and can later be inspected for state and return values.
On adaptor level, both method types are differences by the method decorators
@SYNC
and @ASYNC
, like this:
class LocalFile (rs.cpi.filesystem.File) :
def __init__ (self, api, adaptor) :
rs.cpi.Base.__init__ (self, api, adaptor, 'LocalFile')
@SYNC
def init_instance (self, url, flags, session) :
self._url = url
self._flags = flags
self._session = session
...
@ASYNC
def init_instance_async (self, ttype, url, flags, session) :
self._url = url
self._flags = flags
self._session = session
t = rs.task.Task ()
t._set_result (rs.filesystem.File (url, flags, session, _adaptor_name=_ADAPTOR_NAME))
t._set_state (rs.task.DONE)
return t
@SYNC
def get_url (self) :
return self._url
@ASYNC
def get_url_async (self, ttype) :
t = rs.task.Task ()
t._set_result (self._url)
t._set_state (rs.task.DONE)
return t
Note that the async calls in the example code above are not really
asynchronous, as they both return a task which is in Done
state – a proper
async call would return a task in New
or Running
state, without setting
the task's result, and would perform some required work in a separate thread or
process. Upon completion, the adaptor (which should keep a handle on the
created task) would then set the result and state of the task, thus notifying
the application of the completion of the asynchronous method call.
Also note that there exists an asynchronous version for the init_instance()
method, which is used for the asynchronous API object creation, i.e. on:
#import sys
#import radical.saga as rs
t = rs.job.Service.create ('ssh://host.net')
t.wait ()
if t.state != rs.task.DONE :
print "no job service: " + str(t.exception)
sys.exit (0)
job_service = t.get_result ()
job_service.run_job ("touch /tmp/hello_world")
The exact semantics of SAGA’s asynchronous operations is described elsewhere
(see section api_tasks). Relevant for this discussion is to note that
the asynchronous adaptor methods all receive a task type parameter (ttype
)
which determines the state of the task to return: on ttype==rs.task.TASK
,
the returned task should be in New
state; on ttype==rs.task.ASYNC
the
task is in Running
state, and on ttype==rs.task.SYNC
the returned task
is already Done
. It is up to the adaptor implementor to ensure that
semantics – the examples above do not follow it, and are thus incomplete.
Bulk Operations:¶
On API level, there exists no explicit support for bulk operations. Those can,
however, be rendered implicitly, by collecting asynchronous operations in a task
container, and calling run()
on that container:
bulk = rs.task.Container ()
dir_1 = rs.filesystem.Directory ("gridftp://remote.host1.net/")
for i in ranger (0, 1000) :
src = "gridftp://remote.host1.net/data/file_%4d.dat" % i
tgt = "gridftp://other.hostx.net/share/file_%4d.dat" % i
bulk.add (dir1.copy (src, tgt, rs.task.TASK))
dir_2 = rs.filesystem.Directory ("ssh://remote.host2.net/")
for i in ranger (0, 1000) :
src = "ssh://remote.host2.net/data/file_%4d.info" % i
tgt = "ssh://other.hostx.net/share/file_%4d.info" % i
bulk.add (dir2.copy (src, tgt, rs.task.TASK))
bulk.run ()
bulk.wait (rs.task.ALL)
The above task container gets filled by file copy tasks which are addressing two
different file transfer protocols, and are thus likely mapped to two different
adaptors. The RADICAL-SAGA API implementation will inspect the task container
upon bulk.run()
, and will attempt to sort the contained tasks by adaptor,
i.e. all tasks operating on API objects which bind to the same adaptor instance
(not adaptor class instance) will be lumped together into a task bucket. For
each bucket, the API will then call the respective bulk operation
(container_method
) for that adaptor.
Note that at this point, the task container implementation does not yet know what adaptor class instance to use for the bulk operations. For that purpose, it will inspect
- todo:
Needs completion after generic bulk ops are fixed.
Adaptor Logging¶
Based on Python's logging
facility, RADICAL-SAGA also supports logging, both
as an internal auditing and debugging mechanism, and as application supporting
capability (see section util_logging. Adaptor implementors are
encouraged to use logging as well – for that purposed, the
radical.saga.cpi.AdaptorBase
and radical.saga.cpi.Base
classes will initialize
a self._logger
member for all adaptor and adaptor class implementations,
respectively.
We advice to use the log levels as indicated below:
Log Level
Type of Events Logged
CRITICAL
Only fatal events that will cause the process to abort – that NEVER happen on adaptor level!
ERROR
Events that will prevent the adaptor from functioning correctly.
WARNING
Events that indicate potential problems or unusual events, and can support application diagnostics.
INFO
Events that support adaptor auditing, inform about backend activities, performance etc.
DEBUG
Events which support the tracking of adaptor code paths, to support adaptor debugging (lots of output).
External Processes¶
For many adaptor implementations, it is beneficial to interface to external
processes. In particular, all adaptors performing remote interactions over ssh
or gsissh secured connections will likely need to interact with the remote user
shell. The classes radical.saga.utils.pty_process.PTYProcess
and (on top of
PTYProcess) radical.saga.utils.pty_shell.PTYShell
are designed to simplify
those interactions, and at the same time be more performant than, for example,
pexpect.
- class radical.saga.utils.pty_shell.PTYShell(url, session=None, logger=None, cfg=None, posix=True, interactive=True)[source]¶
This class wraps a shell process and runs it as a
PTYProcess
. The user of this class can start that shell, and run arbitrary commands on it.The shell to be run is expected to be POSIX compliant (bash, dash, sh, ksh etc.) – in particular, we expect the following features:
$?
,$!
,$#
,$*
,$@
,$$
,$PPID
,>&
,>>
,>
,<
,|
,||
,()
,&
,&&
,wait
,kill
,nohup
,shift
,export
,PS1
, andPS2
.Note that
PTYShell
will change the shell prompts (PS1
andPS2
), to simplify output parsing.PS2
will be empty,PS1
will be setPROMPT-$?->
– that way, the prompt will report the exit value of the last command, saving an extra roundtrip. Users of this class should be careful when setting other prompts – seeset_prompt()
for more details.Usage Example:
# start the shell, find its prompt. self.shell = saga.utils.pty_shell.PTYShell("ssh://user@rem.host.net/", contexts, self._logger) # run a simple shell command, merge stderr with stdout. $$ is the pid # of the shell instance. ret, out, _ = self.shell.run_sync (" mkdir -p /tmp/data.$$/" ) # check if mkdir reported success if ret != 0 : raise saga.NoSuccess ("failed to prepare dir (%s)(%s)" % (ret, out)) # stage some data from a local string variable # into a file on the remote system self.shell.stage_to_remote (src = pbs_job_script, tgt = "/tmp/data.$$/job_1.pbs") # check size of staged script (this is actually done on PTYShell level # already, with no extra hop): ret, out, _ = self.shell.run_sync("stat -c '%s' /tmp/data.$$/job_1.pbs") if ret != 0 : raise saga.NoSuccess ("failed to check size (%s)(%s)" % (ret, out)) assert (len(pbs_job_script) == int(out))
Data Staging and Data Management:
The PTYShell class does not only support command execution, but also basic data management: for SSH based shells, it will create a tunneled scp/sftp connection for file staging. Other data management operations (mkdir, size, list, …) are executed either as shell commands, or on the scp/sftp channel (if possible on the data channel, to keep the shell pty free for concurrent command execution). Ssh tunneling is implemented via ssh.v2 ‘ControlMaster’ capabilities (see ssh_config(5)).
For local shells, PTYShell will create an additional shell pty for data management operations.
Asynchronous Notifications:
A third pty process will be created for asynchronous notifications. For that purpose, the shell started on the first channel will create a named pipe, at:
$RADICAL_BASE/.radical/saga/adaptors/shell/async.$$
$$
here represents the pid of the shell process. It will also set the environment variableSAGA_ASYNC_PIPE
to point to that named pipe – any application running on the remote host can write event messages to that pipe, which will be available on the local end (see below). PTYShell leaves it unspecified what format those messages have, but messages are expected to be separated by newlines.An adaptor using PTYShell can subscribe for messages via:
self.pty_shell.subscribe (callback)
where callback is a Python callable. PTYShell will listen on the event channel in a separate thread and invoke that callback on any received message, passing the message text (sans newline) to the callback.
An example usage: the command channel may run the following command line:
( sh -c 'sleep 100 && echo "job $$ done" > $SAGA_ASYNC_PIPE" || echo "job $$ fail" > $SAGA_ASYNC_PIPE" ) &
which will return immediately, and send a notification message at job completion.
Note that writes to named pipes are not atomic. From POSIX:
``A write is atomic if the whole amount written in one operation is not interleaved with data from any other process. This is useful when there are multiple writers sending data to a single reader. Applications need to know how large a write request can be expected to be performed atomically. This maximum is called {PIPE_BUF}. This volume of IEEE Std 1003.1-2001 does not say whether write requests for more than {PIPE_BUF} bytes are atomic, but requires that writes of {PIPE_BUF} or fewer bytes shall be atomic.`
Thus the user is responsible for ensuring that either messages are smaller than PIPE_BUF bytes on the remote system (usually at least 1024, on Linux usually 4096), or to lock the pipe on larger writes.
Automated Restart, Timeouts:
For timeout and restart semantics, please see the documentation to the underlying
saga.utils.pty_process.PTYProcess
class.
- class radical.saga.utils.pty_process.PTYProcess(command, cfg='utils', logger=None)[source]¶
This class spawns a process, providing that child with pty I/O channels – it will maintain stdin, stdout and stderr channels to the child. All write-like operations operate on the stdin, all read-like operations operate on the stdout stream. Data from the stderr stream are at this point redirected to the stdout channel.
Example:
# run an interactive client process pty = PTYProcess ("/usr/bin/ssh -t localhost") # check client's I/O for one of the following patterns (prompts). # Then search again. n, match = pty.find (['password\s*:\s*$', 'want to continue connecting.*\(yes/no\)\s*$', '[\$#>]\s*$']) while True : if n == 0 : # found password prompt - tell the secret pty.write(b"secret\n") n, _ = pty.find (['password\s*:\s*$', 'want to continue connecting.*\(yes/no\)\s*$', '[\$#>]\s*$']) elif n == 1 : # found request to accept host key - sure we do... (who checks # those keys anyways...?). Then search again. pty.write(b"yes\n") n, _ = pty.find (['password\s*:\s*$', 'want to continue connecting.*\(yes/no\)\s*$', '[\$#>]\s*$']) elif n == 2 : # found shell prompt! Wohoo! break while True : # go full Dornroeschen (Sleeping Beauty)... pty.alive (recover=True) or break # check / restart process pty.find (['[\$#>]\s*$']) # find shell prompt pty.write(b"/bin/sleep "100 years"\n") # sleep! SLEEEP! # something bad happened print(pty.autopsy ())