4.1. Writing RADICAL-SAGA Adaptors

Note

This part of the RADICAL-SAGA documentation is not for users of RADICAL-SAGA, but rather for implementors of backend adaptors (although it may be beneficial for users to skim over section Adaptor Binding to gain some insight into RADICAL-SAGA’s mode of operation).

4.1.1. Adaptor Structure

A RADICAL-SAGA adaptor is a Python module with well defined structure. The module must expose a class Adaptor, which (a) must be a singleton, (b) must provide a sanity_check() method, and (c) must inherit from radical.saga.cpi.AdaptorBase. That base class’ constructor (__init__) must be called like this:

class Adaptor (rs.cpi.AdaptorBase):

  __metaclass__ = Singleton

  def __init__ (self) :
    rs.cpi.AdaptorBase.__init__ (self, _ADAPTOR_INFO, _ADAPTOR_OPTIONS)


  def sanity_check (self) :
    # FIXME: detect gsissh tool
    pass

_ADAPTOR_INFO and _ADAPTOR_OPTIONS are module level Python dicts with the following layout:

_ADAPTOR_NAME          = 'rs.adaptor.gsissh.job'
_ADAPTOR_SCHEMAS       = ['ssh', 'gsissh']
_ADAPTOR_OPTIONS       = [
  {
  'category'         : _ADAPTOR_NAME,
  'name'             : 'cache_connection',
  'type'             : bool,
  'default'          : True,
  'valid_options'    : [True, False],
  'documentation'    : 'toggle connection caching',
  'env_variable'     : None
  },
]
_ADAPTOR_INFO          = {
  'name'             : _ADAPTOR_NAME,
  'version'          : 'v0.1',
  'cpis'             : [
    {
    'type'         : 'rs.job.Service',
    'class'        : 'GSISSHJobService',
    'schemas'      : _ADAPTOR_SCHEMAS
    },
    {
    'type'         : 'rs.job.Job',
    'class'        : 'GSISSHJob',
    'schemas'      : _ADAPTOR_SCHEMAS
    }
  ]
}

(It is beneficial to specify _ADAPTOR_NAME and _ADAPTOR_SCHEMAS separately, as they are used in multiple places, as explained later on.)

The adaptor classes listed in the _ADAPTOR_INFO (in this example, GSISSHJob and GSISSHJobService) are the classes which are actually bound to a SAGA API object, and provide its functionality. For that purpose, those classes must inherit the respective object’s Capability Provider Interface (CPI), as shown in the stub below:

class GSISSHJobService (rs.cpi.job.Service) :

  def __init__ (self, api, adaptor) :
    rs.cpi.Base.__init__ (self, api, adaptor, 'GSISSHJobService')


  @SYNC
  def init_instance (self, rm_url, session) :
    self._rm      = rm_url
    self._session = session

The radical.saga.cpi.Base class will make sure that the adaptor classes keep a self._adaptor member, pointing to the adaptor singleton instance (i.e. the module’s Adaptor class instance). It will further initialize a logging module (available as self._logger).

Note that the adaptor class' __init__ does not correspond to the API level object __init__ – instead, the adaptor class construction is a two step process, and the actual constructor semantics is mapped to an init_instance() method, which receives the API level constructor arguments.

4.1.2. Adaptor Registration

Any SAGA adaptor must be registered in the :ref:Engine in order to be usable. That process is very simple, and performed by the radical.saga.cpi.AdaptorBase class – so all the adaptor has to take care of is the correct initialization of that base class, as described in Adaptor Structure. The AdaptorBase will forward the _ADAPTOR_INFO to the radical.saga.engine.Engine class, where the adaptor will be added to a registry of adaptor classes, hierarchically sorted like this (simplified):

Engine._adaptor_registry =
{
  'rs.job.Service' :
  {
    'gshiss' : [rs.adaptors.gsissh.job.GSISSHJobService, ...]
    'ssh'    : [rs.adaptors.gsissh.job.GSISSHJobService, ...]
    'gram'   : [rs.adaptors.globus.job.GRAMJobService, ...]
    ...
  },
  'rs.job.Job' :
  {
    'gshiss' : [rs.adaptors.gsissh.job.GSISSHJob, ...]
    'ssh'    : [rs.adaptors.gsissh.job.GSISSHJob, ...]
    'gram'   : [rs.adaptors.globus.job.GRAMJob, ...]
    ...
  },
  ...
}

That registry is searched when the engine binds an adaptor class instance to a SAGA API object instance – see Adaptor Binding.

4.1.3. Adaptor Binding

Whenever a SAGA API object is created, or whenever any method is invoked on that object, the RADICAL-SAGA implementation needs to (a) select a suitable backend adaptor to perform that operation, and (b) invoke the respective adaptor functionality.

The first part, selecting a specific adaptor for a specific API object instance, is called binding – RADICAL-SAGA binds an adaptor to an object exactly once, at creation time, and the bond remains valid for the lifetime of the API object: on API creation, the API object will request a suitable adaptor from the engine, and will keep it for further method invocations (code simplified):

class Service (object) :

  def __init__ (self, url=None, session=None) :
    self._engine  = getEngine ()
    self._adaptor = self._engine.get_adaptor (self, 'rs.job.Service', url.scheme, ...,
                                                url, session)

  def run_job (self, cmd, host="", ttype=None) :
    return self._adaptor.run_job (cmd, host, ttype=ttype)

  ...

The Engine.get_adaptor call will iterate through the engine’s adaptor registry, and will, for all adaptors which indicated support for the given URL scheme, request an adaptor class instance for the given API class. If an adaptor class instance can successfully be created, the engine will further attempt to call the adaptor class’ init_instance method, which will in fact construct an adaptor level representation of the API level object:

class GSISSHJobService (rs.cpi.job.Service) :

  def __init__ (self, api, adaptor) :
    rs.cpi.Base.__init__ (self, api, adaptor, 'GSISSHJobService')

  def init_instance (self, url, session) :
    # - check if session contains suitable security tokens for (gsi)ssh
    # - check if endpoint given by 'url' can be served
    # - establish and cache connection to that endpoint, with the sesssion
    #   credentials
    ...

If either the adaptor class instantiation or the init_instance invocation raise an exception, the engine will try the next registered adaptor for that API class / url scheme combo. If both steps are successful, the adaptor class instance will be returned to the API object’s constructor, as shown above.

4.1.4. Adaptor State

Instances of adaptor classes will often need to share some state. For example, different instances of rs.job.Job running via ssh on a specific host may want to share a single ssh connection; asynchronous operations on a specific adaptor may want to share a thread pool; adaptor class instances of a specific resource adaptor may want to share a notification endpoint. State sharing supports scalability, and can simplify adaptor code – but also adds some overhead to exchange and synchronize state between those adaptor class instances.

The preferred way to share state is to use the adaptor instance (as it was created by the engine while loading the adaptor’s module) for state exchange (see section Adaptor Registration – all adaptor class instances get the spawning adaptor instance passed at creation time:

class GSISSHJobService (rs.cpi.job.Service) :

  def __init__ (self, api, adaptor) :
    rs.cpi.Base.__init__ (self, api, adaptor, 'GSISSHJobService')

radical.saga.cpi.Base will make that instance available as self._adaptor. As that adaptor class is part of the adaptor modules code base, and thus under full control of the adaptor developer, it is straight forward to use it for state caching and state exchange. Based on the example code in section Adaptor Structure, a connection caching adaptor class could look like this:

class Adaptor (rs.cpi.base.AdaptorBase):

  __metaclass__ = Singleton

  def __init__ (self) :
    rs.cpi.AdaptorBase.__init__ (self, _ADAPTOR_INFO, _ADAPTOR_OPTIONS)
    self._cache = {}
    ...
  ...


class GSISSHJobService (rs.cpi.job.Service) :

  def __init__ (self, api, adaptor) :
    ...
    rs.cpi.Base.__init__ (self, api, adaptor, 'GSISSHJobService')
    self._cache = self._adaptor._cache


  @SYNC
  def init_instance (self, rm_url, session) :
    ...
    if not self._rm in self._adaptor.keys () :
      self._cache [self._rm] = setup_connection (self._rm)


  @SYNC
  def run_job (self, cmd) :
    ...
    connection = self._cache [self._rm]
    return connection.run (cmd)
  ...

The adaptor implementor is responsible for the consistency of the shared state, and may need to use locking to ensure proper consistency in multithreaded environments – the self._adaptor class merely provides a shared container for the data, nothing else. Also, the Adaptor class' destructor should take care of freeing the cached / shared state objects (unless another cleanup mechanism is in place).

4.1.5. Creating API Objects

Several SAGA API objects feature factory-like methods – examples are Directory.open(), job.Service.create_job()/run_job(), and resource.Manager.aquire(). To correctly implement those methods on adaptor level, adaptors need to be able to instantiate the API objects to return. We have seen in section Adaptor Binding that, on API object creation, the Engine will select and bind a suitable adaptor to the object instance. In many cases, however, an implementation of a factory-like API method will want to make sure that the resulting API object is bound to the same adaptor instance as the spawning adaptor class instance itself. For that purpose, all API object constructors will accept two additional parameters: _adaptor (type: radical.saga.cpi.Base or derivative), and _adaptor_state (type: dict). This is also provided for API objects which normally have no public constructor at all:

class Job (rs.attributes.Attributes, rs.task.Async) :

  def __init__ (self, _adaptor=None, _adaptor_state={}) :

    if not _adaptor :
        raise rs.exceptions.IncorrectState ("rs.job.Job constructor is private")
    ...

    # bind to the given adaptor -- this will create the required adaptor
    # class.  We need to specify a schema for adaptor selection -- and
    # simply choose the first one the adaptor offers.
    engine         = getEngine ()
    adaptor_schema = _adaptor.get_schemas()[0]
    self._adaptor  = engine.bind_adaptor (self, 'rs.job.Job', adaptor_schema,
                                          rs.task.NOTASK, _adaptor, _adaptor_state)

As shown above, _adaptor and _adaptor_state are forwarded to the Engine's bind_adaptor() method, and if present will ensure that the resulting API object is bound to the given adaptor. The _adaptor_state dict will be forwarded to the adaptor class level init_instance() call, and can be used to correctly initialize the state of the new adaptor class. An example of adaptor level code for creating an radical.saga.job.Job instance via radical.saga.job.Service.create_job() is below:

class GSISSHJobService (rs.cpi.job.Service) :

  def __init__ (self, api, adaptor) :
      rs.cpi.Base.__init__ (self, api, adaptor, 'GSISSHJobService')

  @SYNC
  def init_instance (self, rm_url, session) :
    ...

  @SYNC
  def create_job (self, jd) :

    state = { 'job_service'     : self,
              'job_description' : jd,
              'session'         : self._session}

    return rs.job.Job (_adaptor=self._adaptor, _adaptor_state=state)


class GSISSHJob (rs.cpi.job.Job) :
  def __init__ (self, api, adaptor) :
    rs.cpi.Base.__init__ (self, api, adaptor, 'GSISSHJob')
    ...

  @SYNC
  def init_instance (self, job_info):

    self._session        = job_info['session']
    self._jd             = job_info['job_description']
    self._parent_service = job_info['job_service']

    self._id             = None # is assigned when calling job.run()
    self._state          = rs.job.NEW

    # register ourselves with the parent service
    self._parent_service._update_jobid (self, self._id)
    ...

4.1.6. Exception Handling

RADICAL-SAGA defines a set of exceptions which can be thrown on the various method invocations (see section api_exceptions. Adaptor implementors must ensure, that the correct exception types are thrown on the corresponding error conditions. If the API layer encounters a non-SAGA exception from the adaptor layer, it will convert it to a rs.NoSuccess exception. While that will reliably shield the user layer from non-SAGA exception types, it is a lossy translation, and likely to hide the underlying cause of the error. This feature is thus to be considered as a safe guard, not as a preferred method of error state communication!

An example of adaptor level error handling is below:

class ContextX509 (rs.cpi.Context) :

  def __init__ (self, api, adaptor) :
    rs.cpi.Base.__init__ (self, api, adaptor, 'ContextX509')

  @SYNC
  def init_instance (self, type) :
    if not type.lower () in (schema.lower() for schema in _ADAPTOR_SCHEMAS) :
      raise rs.exceptions.BadParameter \
              ("the x509 context adaptor only handles x509 contexts - duh!")

  @SYNC
  def _initialize (self, session) :

    if not self._api.user_proxy :
      self._api.user_proxy = "x509up_u%d"  %  os.getuid()   # fallback

    if not os.path.exists (self._api.user_proxy) or \
       not os.path.isfile (self._api.user_proxy)    :
      raise rs.exceptions.BadParameter ("X509 proxy does not exist: %s"
                                               % self._api.user_proxy)

    try :
      fh = open (self._api.user_proxy)
    except Exception as e:
      raise rs.exceptions.PermissionDenied ("X509 proxy '%s' not readable: %s"
                                           % (self._api.user_proxy, str(e)))
    else :
      fh.close ()

4.1.7. Asynchronous Methods

The SAGA API features several objects which implement both synchronous and asynchronous versions of their respective methods. Synchronous calls will return normal objects or values; asynchronous calls will return radical.saga.Task instances, which represent the ongoing asynchronous method, and can later be inspected for state and return values.

On adaptor level, both method types are differences by the method decorators @SYNC and @ASYNC, like this:

class LocalFile (rs.cpi.filesystem.File) :

  def __init__ (self, api, adaptor) :
      rs.cpi.Base.__init__ (self, api, adaptor, 'LocalFile')

  @SYNC
  def init_instance (self, url, flags, session) :
      self._url     = url
      self._flags   = flags
      self._session = session
      ...


  @ASYNC
  def init_instance_async (self, ttype, url, flags, session) :
    self._url     = url
    self._flags   = flags
    self._session = session

    t = rs.task.Task ()
    t._set_result (rs.filesystem.File (url, flags, session, _adaptor_name=_ADAPTOR_NAME))
    t._set_state  (rs.task.DONE)

    return t


  @SYNC
  def get_url (self) :
    return self._url

  @ASYNC
  def get_url_async (self, ttype) :

    t = rs.task.Task ()
    t._set_result (self._url)
    t._set_state  (rs.task.DONE)

    return t

Note that the async calls in the example code above are not really asynchronous, as they both return a task which is in Done state – a proper async call would return a task in New or Running state, without setting the task's result, and would perform some required work in a separate thread or process. Upon completion, the adaptor (which should keep a handle on the created task) would then set the result and state of the task, thus notifying the application of the completion of the asynchronous method call.

Also note that there exists an asynchronous version for the init_instance() method, which is used for the asynchronous API object creation, i.e. on:

#import sys
#import radical.saga as rs

t = rs.job.Service.create ('ssh://host.net')

t.wait ()

if t.state != rs.task.DONE :
  print "no job service: " + str(t.exception)
  sys.exit (0)

job_service = t.get_result ()
job_service.run_job ("touch /tmp/hello_world")

The exact semantics of SAGA’s asynchronous operations is described elsewhere (see section api_tasks). Relevant for this discussion is to note that the asynchronous adaptor methods all receive a task type parameter (ttype) which determines the state of the task to return: on ttype==rs.task.TASK, the returned task should be in New state; on ttype==rs.task.ASYNC the task is in Running state, and on ttype==rs.task.SYNC the returned task is already Done. It is up to the adaptor implementor to ensure that semantics – the examples above do not follow it, and are thus incomplete.

4.1.8. Bulk Operations:

On API level, there exists no explicit support for bulk operations. Those can, however, be rendered implicitly, by collecting asynchronous operations in a task container, and calling run() on that container:

bulk  = rs.task.Container ()

dir_1 = rs.filesystem.Directory ("gridftp://remote.host1.net/")
for i in ranger (0, 1000) :

  src = "gridftp://remote.host1.net/data/file_%4d.dat"  %  i
  tgt = "gridftp://other.hostx.net/share/file_%4d.dat"  %  i

  bulk.add (dir1.copy (src, tgt, rs.task.TASK))


dir_2 = rs.filesystem.Directory ("ssh://remote.host2.net/")
for i in ranger (0, 1000) :

  src = "ssh://remote.host2.net/data/file_%4d.info"  %  i
  tgt = "ssh://other.hostx.net/share/file_%4d.info"  %  i

  bulk.add (dir2.copy (src, tgt, rs.task.TASK))


bulk.run  ()
bulk.wait (rs.task.ALL)

The above task container gets filled by file copy tasks which are addressing two different file transfer protocols, and are thus likely mapped to two different adaptors. The RADICAL-SAGA API implementation will inspect the task container upon bulk.run(), and will attempt to sort the contained tasks by adaptor, i.e. all tasks operating on API objects which bind to the same adaptor instance (not adaptor class instance) will be lumped together into a task bucket. For each bucket, the API will then call the respective bulk operation (container_method) for that adaptor.

Note that at this point, the task container implementation does not yet know what adaptor class instance to use for the bulk operations. For that purpose, it will inspect

todo:

Needs completion after generic bulk ops are fixed.

4.1.9. Adaptor Logging

Based on Python's logging facility, RADICAL-SAGA also supports logging, both as an internal auditing and debugging mechanism, and as application supporting capability (see section util_logging. Adaptor implementors are encouraged to use logging as well – for that purposed, the radical.saga.cpi.AdaptorBase and radical.saga.cpi.Base classes will initialize a self._logger member for all adaptor and adaptor class implementations, respectively.

We advice to use the log levels as indicated below:

Log Level

Type of Events Logged

CRITICAL

Only fatal events that will cause the process to abort – that NEVER happen on adaptor level!

ERROR

Events that will prevent the adaptor from functioning correctly.

WARNING

Events that indicate potential problems or unusual events, and can support application diagnostics.

INFO

Events that support adaptor auditing, inform about backend activities, performance etc.

DEBUG

Events which support the tracking of adaptor code paths, to support adaptor debugging (lots of output).

4.1.10. External Processes

For many adaptor implementations, it is beneficial to interface to external processes. In particular, all adaptors performing remote interactions over ssh or gsissh secured connections will likely need to interact with the remote user shell. The classes radical.saga.utils.pty_process.PTYProcess and (on top of PTYProcess) radical.saga.utils.pty_shell.PTYShell are designed to simplify those interactions, and at the same time be more performant than, for example, pexpect.

class radical.saga.utils.pty_shell.PTYShell(url, session=None, logger=None, cfg=None, posix=True, interactive=True)[source]

This class wraps a shell process and runs it as a PTYProcess. The user of this class can start that shell, and run arbitrary commands on it.

The shell to be run is expected to be POSIX compliant (bash, dash, sh, ksh etc.) – in particular, we expect the following features: $?, $!, $#, $*, $@, $$, $PPID, >&, >>, >, <, |, ||, (), &, &&, wait, kill, nohup, shift, export, PS1, and PS2.

Note that PTYShell will change the shell prompts (PS1 and PS2), to simplify output parsing. PS2 will be empty, PS1 will be set PROMPT-$?-> – that way, the prompt will report the exit value of the last command, saving an extra roundtrip. Users of this class should be careful when setting other prompts – see set_prompt() for more details.

Usage Example:

# start the shell, find its prompt.
self.shell = saga.utils.pty_shell.PTYShell("ssh://user@rem.host.net/",
                                           contexts, self._logger)

# run a simple shell command, merge stderr with stdout.  $$ is the pid
# of the shell instance.
ret, out, _ = self.shell.run_sync (" mkdir -p /tmp/data.$$/" )

# check if mkdir reported success
if  ret != 0 :
    raise saga.NoSuccess ("failed to prepare dir (%s)(%s)" % (ret, out))

# stage some data from a local string variable
# into a file on the remote system
self.shell.stage_to_remote (src = pbs_job_script,
                            tgt = "/tmp/data.$$/job_1.pbs")

# check size of staged script (this is actually done on PTYShell level
# already, with no extra hop):
ret, out, _ = self.shell.run_sync("stat -c '%s' /tmp/data.$$/job_1.pbs")
if  ret != 0 :
    raise saga.NoSuccess ("failed to check size (%s)(%s)" % (ret, out))

assert (len(pbs_job_script) == int(out))

Data Staging and Data Management:

The PTYShell class does not only support command execution, but also basic data management: for SSH based shells, it will create a tunneled scp/sftp connection for file staging. Other data management operations (mkdir, size, list, …) are executed either as shell commands, or on the scp/sftp channel (if possible on the data channel, to keep the shell pty free for concurrent command execution). Ssh tunneling is implemented via ssh.v2 ‘ControlMaster’ capabilities (see ssh_config(5)).

For local shells, PTYShell will create an additional shell pty for data management operations.

Asynchronous Notifications:

A third pty process will be created for asynchronous notifications. For that purpose, the shell started on the first channel will create a named pipe, at:

$RADICAL_BASE/.radical/saga/adaptors/shell/async.$$

$$ here represents the pid of the shell process. It will also set the environment variable SAGA_ASYNC_PIPE to point to that named pipe – any application running on the remote host can write event messages to that pipe, which will be available on the local end (see below). PTYShell leaves it unspecified what format those messages have, but messages are expected to be separated by newlines.

An adaptor using PTYShell can subscribe for messages via:

self.pty_shell.subscribe (callback)

where callback is a Python callable. PTYShell will listen on the event channel in a separate thread and invoke that callback on any received message, passing the message text (sans newline) to the callback.

An example usage: the command channel may run the following command line:

( sh -c 'sleep 100 && echo "job $$ done" > $SAGA_ASYNC_PIPE"                          || echo "job $$ fail" > $SAGA_ASYNC_PIPE" ) &

which will return immediately, and send a notification message at job completion.

Note that writes to named pipes are not atomic. From POSIX:

``A write is atomic if the whole amount written in one operation is not interleaved with data from any other process. This is useful when there are multiple writers sending data to a single reader. Applications need to know how large a write request can be expected to be performed atomically. This maximum is called {PIPE_BUF}. This volume of IEEE Std 1003.1-2001 does not say whether write requests for more than {PIPE_BUF} bytes are atomic, but requires that writes of {PIPE_BUF} or fewer bytes shall be atomic.`

Thus the user is responsible for ensuring that either messages are smaller than PIPE_BUF bytes on the remote system (usually at least 1024, on Linux usually 4096), or to lock the pipe on larger writes.

Automated Restart, Timeouts:

For timeout and restart semantics, please see the documentation to the underlying saga.utils.pty_process.PTYProcess class.

class radical.saga.utils.pty_process.PTYProcess(command, cfg='utils', logger=None)[source]

This class spawns a process, providing that child with pty I/O channels – it will maintain stdin, stdout and stderr channels to the child. All write-like operations operate on the stdin, all read-like operations operate on the stdout stream. Data from the stderr stream are at this point redirected to the stdout channel.

Example:

# run an interactive client process
pty = PTYProcess ("/usr/bin/ssh -t localhost")

# check client's I/O for one of the following patterns (prompts).
# Then search again.
n, match = pty.find (['password\s*:\s*$',
                      'want to continue connecting.*\(yes/no\)\s*$',
                      '[\$#>]\s*$'])

while True :

    if n == 0 :
        # found password prompt - tell the secret
        pty.write(b"secret\n")
        n, _ = pty.find (['password\s*:\s*$',
                          'want to continue connecting.*\(yes/no\)\s*$',
                          '[\$#>]\s*$'])
    elif n == 1 :
        # found request to accept host key - sure we do... (who checks
        # those keys anyways...?).  Then search again.
        pty.write(b"yes\n")
        n, _ = pty.find (['password\s*:\s*$',
                          'want to continue connecting.*\(yes/no\)\s*$',
                          '[\$#>]\s*$'])
    elif n == 2 :
        # found shell prompt!  Wohoo!
        break


while True :
    # go full Dornroeschen (Sleeping Beauty)...
    pty.alive (recover=True) or break      # check / restart process
    pty.find  (['[\$#>]\s*$'])             # find shell prompt
    pty.write(b"/bin/sleep "100 years"\n") # sleep!  SLEEEP!

# something bad happened
print(pty.autopsy ())