4.1. Writing RADICAL-SAGA Adaptors¶
Note
This part of the RADICAL-SAGA documentation is not for users of RADICAL-SAGA, but rather for implementors of backend adaptors (although it may be beneficial for users to skim over section Adaptor Binding to gain some insight into RADICAL-SAGA’s mode of operation).
4.1.1. Adaptor Structure¶
A RADICAL-SAGA adaptor is a Python module with well defined structure. The
module must expose a class Adaptor
, which (a) must be a singleton, (b) must
provide a sanity_check()
method, and (c) must inherit from
radical.saga.cpi.AdaptorBase
. That base class’ constructor (__init__
)
must be called like this:
class Adaptor (rs.cpi.AdaptorBase):
__metaclass__ = Singleton
def __init__ (self) :
rs.cpi.AdaptorBase.__init__ (self, _ADAPTOR_INFO, _ADAPTOR_OPTIONS)
def sanity_check (self) :
# FIXME: detect gsissh tool
pass
_ADAPTOR_INFO
and _ADAPTOR_OPTIONS
are module level Python dict
s with the
following layout:
_ADAPTOR_NAME = 'rs.adaptor.gsissh.job'
_ADAPTOR_SCHEMAS = ['ssh', 'gsissh']
_ADAPTOR_OPTIONS = [
{
'category' : _ADAPTOR_NAME,
'name' : 'cache_connection',
'type' : bool,
'default' : True,
'valid_options' : [True, False],
'documentation' : 'toggle connection caching',
'env_variable' : None
},
]
_ADAPTOR_INFO = {
'name' : _ADAPTOR_NAME,
'version' : 'v0.1',
'cpis' : [
{
'type' : 'rs.job.Service',
'class' : 'GSISSHJobService',
'schemas' : _ADAPTOR_SCHEMAS
},
{
'type' : 'rs.job.Job',
'class' : 'GSISSHJob',
'schemas' : _ADAPTOR_SCHEMAS
}
]
}
(It is beneficial to specify _ADAPTOR_NAME
and _ADAPTOR_SCHEMAS
separately, as they are used in multiple places, as explained later on.)
The adaptor classes listed in the _ADAPTOR_INFO
(in this example,
GSISSHJob
and GSISSHJobService
) are the classes which are actually bound
to a SAGA API object, and provide its functionality. For that purpose, those
classes must inherit the respective object’s Capability Provider Interface
(CPI), as shown in the stub below:
class GSISSHJobService (rs.cpi.job.Service) :
def __init__ (self, api, adaptor) :
rs.cpi.Base.__init__ (self, api, adaptor, 'GSISSHJobService')
@SYNC
def init_instance (self, rm_url, session) :
self._rm = rm_url
self._session = session
The radical.saga.cpi.Base
class will make sure that the adaptor classes keep
a self._adaptor
member, pointing to the adaptor singleton instance (i.e. the
module’s Adaptor
class instance). It will further initialize a logging module
(available as self._logger
).
Note that the adaptor class' __init__
does not correspond to the API level
object __init__
– instead, the adaptor class construction is a two step
process, and the actual constructor semantics is mapped to an
init_instance()
method, which receives the API level constructor arguments.
4.1.2. Adaptor Registration¶
Any SAGA adaptor must be registered in the :ref:Engine
in order to be
usable. That process is very simple, and performed by the
radical.saga.cpi.AdaptorBase
class – so all the adaptor has to take care
of is the correct initialization of that base class, as described in
Adaptor Structure. The AdaptorBase
will forward the
_ADAPTOR_INFO
to the radical.saga.engine.Engine
class, where the adaptor
will be added to a registry of adaptor classes, hierarchically sorted like
this (simplified):
Engine._adaptor_registry =
{
'rs.job.Service' :
{
'gshiss' : [rs.adaptors.gsissh.job.GSISSHJobService, ...]
'ssh' : [rs.adaptors.gsissh.job.GSISSHJobService, ...]
'gram' : [rs.adaptors.globus.job.GRAMJobService, ...]
...
},
'rs.job.Job' :
{
'gshiss' : [rs.adaptors.gsissh.job.GSISSHJob, ...]
'ssh' : [rs.adaptors.gsissh.job.GSISSHJob, ...]
'gram' : [rs.adaptors.globus.job.GRAMJob, ...]
...
},
...
}
That registry is searched when the engine binds an adaptor class instance to a SAGA API object instance – see Adaptor Binding.
4.1.3. Adaptor Binding¶
Whenever a SAGA API object is created, or whenever any method is invoked on that object, the RADICAL-SAGA implementation needs to (a) select a suitable backend adaptor to perform that operation, and (b) invoke the respective adaptor functionality.
The first part, selecting a specific adaptor for a specific API object instance, is called binding – RADICAL-SAGA binds an adaptor to an object exactly once, at creation time, and the bond remains valid for the lifetime of the API object: on API creation, the API object will request a suitable adaptor from the engine, and will keep it for further method invocations (code simplified):
class Service (object) :
def __init__ (self, url=None, session=None) :
self._engine = getEngine ()
self._adaptor = self._engine.get_adaptor (self, 'rs.job.Service', url.scheme, ...,
url, session)
def run_job (self, cmd, host="", ttype=None) :
return self._adaptor.run_job (cmd, host, ttype=ttype)
...
The Engine.get_adaptor
call will iterate through the engine’s adaptor
registry, and will, for all adaptors which indicated support for the given URL
scheme, request an adaptor class instance for the given API class. If an
adaptor class instance can successfully be created, the engine will further
attempt to call the adaptor class’ init_instance
method, which will in fact
construct an adaptor level representation of the API level object:
class GSISSHJobService (rs.cpi.job.Service) :
def __init__ (self, api, adaptor) :
rs.cpi.Base.__init__ (self, api, adaptor, 'GSISSHJobService')
def init_instance (self, url, session) :
# - check if session contains suitable security tokens for (gsi)ssh
# - check if endpoint given by 'url' can be served
# - establish and cache connection to that endpoint, with the sesssion
# credentials
...
If either the adaptor class instantiation or the init_instance
invocation
raise an exception, the engine will try the next registered adaptor for that API
class / url scheme combo. If both steps are successful, the adaptor class
instance will be returned to the API object’s constructor, as shown above.
4.1.4. Adaptor State¶
Instances of adaptor classes will often need to share some state. For example,
different instances of rs.job.Job
running via ssh on a specific host may
want to share a single ssh connection; asynchronous operations on a specific
adaptor may want to share a thread pool; adaptor class instances of a specific
resource adaptor may want to share a notification endpoint. State sharing
supports scalability, and can simplify adaptor code – but also adds some
overhead to exchange and synchronize state between those adaptor class
instances.
The preferred way to share state is to use the adaptor instance (as it was created by the engine while loading the adaptor’s module) for state exchange (see section Adaptor Registration – all adaptor class instances get the spawning adaptor instance passed at creation time:
class GSISSHJobService (rs.cpi.job.Service) :
def __init__ (self, api, adaptor) :
rs.cpi.Base.__init__ (self, api, adaptor, 'GSISSHJobService')
radical.saga.cpi.Base
will make that instance available as self._adaptor
.
As that adaptor class is part of the adaptor modules code base, and thus under
full control of the adaptor developer, it is straight forward to use it for
state caching and state exchange. Based on the example code in section
Adaptor Structure, a connection caching adaptor class could look like
this:
class Adaptor (rs.cpi.base.AdaptorBase):
__metaclass__ = Singleton
def __init__ (self) :
rs.cpi.AdaptorBase.__init__ (self, _ADAPTOR_INFO, _ADAPTOR_OPTIONS)
self._cache = {}
...
...
class GSISSHJobService (rs.cpi.job.Service) :
def __init__ (self, api, adaptor) :
...
rs.cpi.Base.__init__ (self, api, adaptor, 'GSISSHJobService')
self._cache = self._adaptor._cache
@SYNC
def init_instance (self, rm_url, session) :
...
if not self._rm in self._adaptor.keys () :
self._cache [self._rm] = setup_connection (self._rm)
@SYNC
def run_job (self, cmd) :
...
connection = self._cache [self._rm]
return connection.run (cmd)
...
The adaptor implementor is responsible for the consistency of the shared state,
and may need to use locking to ensure proper consistency in multithreaded
environments – the self._adaptor
class merely provides a shared container
for the data, nothing else. Also, the Adaptor class' destructor should take
care of freeing the cached / shared state objects (unless another cleanup
mechanism is in place).
4.1.5. Creating API Objects¶
Several SAGA API objects feature factory-like methods – examples are
Directory.open()
, job.Service.create_job()/run_job()
, and
resource.Manager.aquire()
. To correctly implement those methods on adaptor
level, adaptors need to be able to instantiate the API objects to return. We
have seen in section Adaptor Binding that, on API object creation, the
Engine
will select and bind a suitable adaptor to the object instance. In
many cases, however, an implementation of a factory-like API method will want to
make sure that the resulting API object is bound to the same adaptor instance as
the spawning adaptor class instance itself. For that purpose, all API object
constructors will accept two additional parameters: _adaptor
(type:
radical.saga.cpi.Base
or derivative), and _adaptor_state
(type:
dict
). This is also provided for API objects which normally have no
public constructor at all:
class Job (rs.attributes.Attributes, rs.task.Async) :
def __init__ (self, _adaptor=None, _adaptor_state={}) :
if not _adaptor :
raise rs.exceptions.IncorrectState ("rs.job.Job constructor is private")
...
# bind to the given adaptor -- this will create the required adaptor
# class. We need to specify a schema for adaptor selection -- and
# simply choose the first one the adaptor offers.
engine = getEngine ()
adaptor_schema = _adaptor.get_schemas()[0]
self._adaptor = engine.bind_adaptor (self, 'rs.job.Job', adaptor_schema,
rs.task.NOTASK, _adaptor, _adaptor_state)
As shown above, _adaptor
and _adaptor_state
are forwarded to the
Engine's bind_adaptor()
method, and if present will ensure that the
resulting API object is bound to the given adaptor. The _adaptor_state
dict
will be forwarded to the adaptor class level init_instance()
call, and can
be used to correctly initialize the state of the new adaptor class. An example
of adaptor level code for creating an radical.saga.job.Job
instance via
radical.saga.job.Service
.create_job()
is below:
class GSISSHJobService (rs.cpi.job.Service) :
def __init__ (self, api, adaptor) :
rs.cpi.Base.__init__ (self, api, adaptor, 'GSISSHJobService')
@SYNC
def init_instance (self, rm_url, session) :
...
@SYNC
def create_job (self, jd) :
state = { 'job_service' : self,
'job_description' : jd,
'session' : self._session}
return rs.job.Job (_adaptor=self._adaptor, _adaptor_state=state)
class GSISSHJob (rs.cpi.job.Job) :
def __init__ (self, api, adaptor) :
rs.cpi.Base.__init__ (self, api, adaptor, 'GSISSHJob')
...
@SYNC
def init_instance (self, job_info):
self._session = job_info['session']
self._jd = job_info['job_description']
self._parent_service = job_info['job_service']
self._id = None # is assigned when calling job.run()
self._state = rs.job.NEW
# register ourselves with the parent service
self._parent_service._update_jobid (self, self._id)
...
4.1.6. Exception Handling¶
RADICAL-SAGA defines a set of exceptions which can be thrown on the various
method invocations (see section api_exceptions. Adaptor implementors
must ensure, that the correct exception types are thrown on the corresponding
error conditions. If the API layer encounters a non-SAGA exception from the
adaptor layer, it will convert it to a rs.NoSuccess
exception. While that
will reliably shield the user layer from non-SAGA exception types, it is a lossy
translation, and likely to hide the underlying cause of the error. This feature
is thus to be considered as a safe guard, not as a preferred method of error
state communication!
An example of adaptor level error handling is below:
class ContextX509 (rs.cpi.Context) :
def __init__ (self, api, adaptor) :
rs.cpi.Base.__init__ (self, api, adaptor, 'ContextX509')
@SYNC
def init_instance (self, type) :
if not type.lower () in (schema.lower() for schema in _ADAPTOR_SCHEMAS) :
raise rs.exceptions.BadParameter \
("the x509 context adaptor only handles x509 contexts - duh!")
@SYNC
def _initialize (self, session) :
if not self._api.user_proxy :
self._api.user_proxy = "x509up_u%d" % os.getuid() # fallback
if not os.path.exists (self._api.user_proxy) or \
not os.path.isfile (self._api.user_proxy) :
raise rs.exceptions.BadParameter ("X509 proxy does not exist: %s"
% self._api.user_proxy)
try :
fh = open (self._api.user_proxy)
except Exception as e:
raise rs.exceptions.PermissionDenied ("X509 proxy '%s' not readable: %s"
% (self._api.user_proxy, str(e)))
else :
fh.close ()
4.1.7. Asynchronous Methods¶
The SAGA API features several objects which implement both synchronous and
asynchronous versions of their respective methods. Synchronous calls will
return normal objects or values; asynchronous calls will return
radical.saga.Task
instances, which represent the ongoing asynchronous method,
and can later be inspected for state and return values.
On adaptor level, both method types are differences by the method decorators
@SYNC
and @ASYNC
, like this:
class LocalFile (rs.cpi.filesystem.File) :
def __init__ (self, api, adaptor) :
rs.cpi.Base.__init__ (self, api, adaptor, 'LocalFile')
@SYNC
def init_instance (self, url, flags, session) :
self._url = url
self._flags = flags
self._session = session
...
@ASYNC
def init_instance_async (self, ttype, url, flags, session) :
self._url = url
self._flags = flags
self._session = session
t = rs.task.Task ()
t._set_result (rs.filesystem.File (url, flags, session, _adaptor_name=_ADAPTOR_NAME))
t._set_state (rs.task.DONE)
return t
@SYNC
def get_url (self) :
return self._url
@ASYNC
def get_url_async (self, ttype) :
t = rs.task.Task ()
t._set_result (self._url)
t._set_state (rs.task.DONE)
return t
Note that the async calls in the example code above are not really
asynchronous, as they both return a task which is in Done
state – a proper
async call would return a task in New
or Running
state, without setting
the task's result, and would perform some required work in a separate thread or
process. Upon completion, the adaptor (which should keep a handle on the
created task) would then set the result and state of the task, thus notifying
the application of the completion of the asynchronous method call.
Also note that there exists an asynchronous version for the init_instance()
method, which is used for the asynchronous API object creation, i.e. on:
#import sys
#import radical.saga as rs
t = rs.job.Service.create ('ssh://host.net')
t.wait ()
if t.state != rs.task.DONE :
print "no job service: " + str(t.exception)
sys.exit (0)
job_service = t.get_result ()
job_service.run_job ("touch /tmp/hello_world")
The exact semantics of SAGA’s asynchronous operations is described elsewhere
(see section api_tasks). Relevant for this discussion is to note that
the asynchronous adaptor methods all receive a task type parameter (ttype
)
which determines the state of the task to return: on ttype==rs.task.TASK
,
the returned task should be in New
state; on ttype==rs.task.ASYNC
the
task is in Running
state, and on ttype==rs.task.SYNC
the returned task
is already Done
. It is up to the adaptor implementor to ensure that
semantics – the examples above do not follow it, and are thus incomplete.
4.1.8. Bulk Operations:¶
On API level, there exists no explicit support for bulk operations. Those can,
however, be rendered implicitly, by collecting asynchronous operations in a task
container, and calling run()
on that container:
bulk = rs.task.Container ()
dir_1 = rs.filesystem.Directory ("gridftp://remote.host1.net/")
for i in ranger (0, 1000) :
src = "gridftp://remote.host1.net/data/file_%4d.dat" % i
tgt = "gridftp://other.hostx.net/share/file_%4d.dat" % i
bulk.add (dir1.copy (src, tgt, rs.task.TASK))
dir_2 = rs.filesystem.Directory ("ssh://remote.host2.net/")
for i in ranger (0, 1000) :
src = "ssh://remote.host2.net/data/file_%4d.info" % i
tgt = "ssh://other.hostx.net/share/file_%4d.info" % i
bulk.add (dir2.copy (src, tgt, rs.task.TASK))
bulk.run ()
bulk.wait (rs.task.ALL)
The above task container gets filled by file copy tasks which are addressing two
different file transfer protocols, and are thus likely mapped to two different
adaptors. The RADICAL-SAGA API implementation will inspect the task container
upon bulk.run()
, and will attempt to sort the contained tasks by adaptor,
i.e. all tasks operating on API objects which bind to the same adaptor instance
(not adaptor class instance) will be lumped together into a task bucket. For
each bucket, the API will then call the respective bulk operation
(container_method
) for that adaptor.
Note that at this point, the task container implementation does not yet know what adaptor class instance to use for the bulk operations. For that purpose, it will inspect
- todo:
Needs completion after generic bulk ops are fixed.
4.1.9. Adaptor Logging¶
Based on Python's logging
facility, RADICAL-SAGA also supports logging, both
as an internal auditing and debugging mechanism, and as application supporting
capability (see section util_logging. Adaptor implementors are
encouraged to use logging as well – for that purposed, the
radical.saga.cpi.AdaptorBase
and radical.saga.cpi.Base
classes will initialize
a self._logger
member for all adaptor and adaptor class implementations,
respectively.
We advice to use the log levels as indicated below:
Log Level
Type of Events Logged
CRITICAL
Only fatal events that will cause the process to abort – that NEVER happen on adaptor level!
ERROR
Events that will prevent the adaptor from functioning correctly.
WARNING
Events that indicate potential problems or unusual events, and can support application diagnostics.
INFO
Events that support adaptor auditing, inform about backend activities, performance etc.
DEBUG
Events which support the tracking of adaptor code paths, to support adaptor debugging (lots of output).
4.1.10. External Processes¶
For many adaptor implementations, it is beneficial to interface to external
processes. In particular, all adaptors performing remote interactions over ssh
or gsissh secured connections will likely need to interact with the remote user
shell. The classes radical.saga.utils.pty_process.PTYProcess
and (on top of
PTYProcess) radical.saga.utils.pty_shell.PTYShell
are designed to simplify
those interactions, and at the same time be more performant than, for example,
pexpect.
- class radical.saga.utils.pty_shell.PTYShell(url, session=None, logger=None, cfg=None, posix=True, interactive=True)[source]¶
This class wraps a shell process and runs it as a
PTYProcess
. The user of this class can start that shell, and run arbitrary commands on it.The shell to be run is expected to be POSIX compliant (bash, dash, sh, ksh etc.) – in particular, we expect the following features:
$?
,$!
,$#
,$*
,$@
,$$
,$PPID
,>&
,>>
,>
,<
,|
,||
,()
,&
,&&
,wait
,kill
,nohup
,shift
,export
,PS1
, andPS2
.Note that
PTYShell
will change the shell prompts (PS1
andPS2
), to simplify output parsing.PS2
will be empty,PS1
will be setPROMPT-$?->
– that way, the prompt will report the exit value of the last command, saving an extra roundtrip. Users of this class should be careful when setting other prompts – seeset_prompt()
for more details.Usage Example:
# start the shell, find its prompt. self.shell = saga.utils.pty_shell.PTYShell("ssh://user@rem.host.net/", contexts, self._logger) # run a simple shell command, merge stderr with stdout. $$ is the pid # of the shell instance. ret, out, _ = self.shell.run_sync (" mkdir -p /tmp/data.$$/" ) # check if mkdir reported success if ret != 0 : raise saga.NoSuccess ("failed to prepare dir (%s)(%s)" % (ret, out)) # stage some data from a local string variable # into a file on the remote system self.shell.stage_to_remote (src = pbs_job_script, tgt = "/tmp/data.$$/job_1.pbs") # check size of staged script (this is actually done on PTYShell level # already, with no extra hop): ret, out, _ = self.shell.run_sync("stat -c '%s' /tmp/data.$$/job_1.pbs") if ret != 0 : raise saga.NoSuccess ("failed to check size (%s)(%s)" % (ret, out)) assert (len(pbs_job_script) == int(out))
Data Staging and Data Management:
The PTYShell class does not only support command execution, but also basic data management: for SSH based shells, it will create a tunneled scp/sftp connection for file staging. Other data management operations (mkdir, size, list, …) are executed either as shell commands, or on the scp/sftp channel (if possible on the data channel, to keep the shell pty free for concurrent command execution). Ssh tunneling is implemented via ssh.v2 ‘ControlMaster’ capabilities (see ssh_config(5)).
For local shells, PTYShell will create an additional shell pty for data management operations.
Asynchronous Notifications:
A third pty process will be created for asynchronous notifications. For that purpose, the shell started on the first channel will create a named pipe, at:
$RADICAL_BASE/.radical/saga/adaptors/shell/async.$$
$$
here represents the pid of the shell process. It will also set the environment variableSAGA_ASYNC_PIPE
to point to that named pipe – any application running on the remote host can write event messages to that pipe, which will be available on the local end (see below). PTYShell leaves it unspecified what format those messages have, but messages are expected to be separated by newlines.An adaptor using PTYShell can subscribe for messages via:
self.pty_shell.subscribe (callback)
where callback is a Python callable. PTYShell will listen on the event channel in a separate thread and invoke that callback on any received message, passing the message text (sans newline) to the callback.
An example usage: the command channel may run the following command line:
( sh -c 'sleep 100 && echo "job $$ done" > $SAGA_ASYNC_PIPE" || echo "job $$ fail" > $SAGA_ASYNC_PIPE" ) &
which will return immediately, and send a notification message at job completion.
Note that writes to named pipes are not atomic. From POSIX:
``A write is atomic if the whole amount written in one operation is not interleaved with data from any other process. This is useful when there are multiple writers sending data to a single reader. Applications need to know how large a write request can be expected to be performed atomically. This maximum is called {PIPE_BUF}. This volume of IEEE Std 1003.1-2001 does not say whether write requests for more than {PIPE_BUF} bytes are atomic, but requires that writes of {PIPE_BUF} or fewer bytes shall be atomic.`
Thus the user is responsible for ensuring that either messages are smaller than PIPE_BUF bytes on the remote system (usually at least 1024, on Linux usually 4096), or to lock the pipe on larger writes.
Automated Restart, Timeouts:
For timeout and restart semantics, please see the documentation to the underlying
saga.utils.pty_process.PTYProcess
class.
- class radical.saga.utils.pty_process.PTYProcess(command, cfg='utils', logger=None)[source]¶
This class spawns a process, providing that child with pty I/O channels – it will maintain stdin, stdout and stderr channels to the child. All write-like operations operate on the stdin, all read-like operations operate on the stdout stream. Data from the stderr stream are at this point redirected to the stdout channel.
Example:
# run an interactive client process pty = PTYProcess ("/usr/bin/ssh -t localhost") # check client's I/O for one of the following patterns (prompts). # Then search again. n, match = pty.find (['password\s*:\s*$', 'want to continue connecting.*\(yes/no\)\s*$', '[\$#>]\s*$']) while True : if n == 0 : # found password prompt - tell the secret pty.write(b"secret\n") n, _ = pty.find (['password\s*:\s*$', 'want to continue connecting.*\(yes/no\)\s*$', '[\$#>]\s*$']) elif n == 1 : # found request to accept host key - sure we do... (who checks # those keys anyways...?). Then search again. pty.write(b"yes\n") n, _ = pty.find (['password\s*:\s*$', 'want to continue connecting.*\(yes/no\)\s*$', '[\$#>]\s*$']) elif n == 2 : # found shell prompt! Wohoo! break while True : # go full Dornroeschen (Sleeping Beauty)... pty.alive (recover=True) or break # check / restart process pty.find (['[\$#>]\s*$']) # find shell prompt pty.write(b"/bin/sleep "100 years"\n") # sleep! SLEEEP! # something bad happened print(pty.autopsy ())