4.1. Writing RADICAL-SAGA Adaptors


This part of the RADICAL-SAGA documentation is not for users of RADICAL-SAGA, but rather for implementors of backend adaptors (although it may be beneficial for users to skim over section Adaptor Binding to gain some insight into RADICAL-SAGA’s mode of operation).

4.1.1. Adaptor Structure

A RADICAL-SAGA adaptor is a Python module with well defined structure. The module must expose a class Adaptor, which (a) must be a singleton, (b) must provide a sanity_check() method, and (c) must inherit from radical.saga.cpi.AdaptorBase. That base class’ constructor (__init__) must be called like this:

class Adaptor (rs.cpi.AdaptorBase):

  __metaclass__ = Singleton

  def __init__ (self) :
    rs.cpi.AdaptorBase.__init__ (self, _ADAPTOR_INFO, _ADAPTOR_OPTIONS)

  def sanity_check (self) :
    # FIXME: detect gsissh tool

_ADAPTOR_INFO and _ADAPTOR_OPTIONS are module level Python dicts with the following layout:

_ADAPTOR_NAME          = 'rs.adaptor.gsissh.job'
_ADAPTOR_SCHEMAS       = ['ssh', 'gsissh']
  'category'         : _ADAPTOR_NAME,
  'name'             : 'cache_connection',
  'type'             : bool,
  'default'          : True,
  'valid_options'    : [True, False],
  'documentation'    : 'toggle connection caching',
  'env_variable'     : None
_ADAPTOR_INFO          = {
  'name'             : _ADAPTOR_NAME,
  'version'          : 'v0.1',
  'cpis'             : [
    'type'         : 'rs.job.Service',
    'class'        : 'GSISSHJobService',
    'schemas'      : _ADAPTOR_SCHEMAS
    'type'         : 'rs.job.Job',
    'class'        : 'GSISSHJob',
    'schemas'      : _ADAPTOR_SCHEMAS

(It is beneficial to specify _ADAPTOR_NAME and _ADAPTOR_SCHEMAS separately, as they are used in multiple places, as explained later on.)

The adaptor classes listed in the _ADAPTOR_INFO (in this example, GSISSHJob and GSISSHJobService) are the classes which are actually bound to a SAGA API object, and provide its functionality. For that purpose, those classes must inherit the respective object’s Capability Provider Interface (CPI), as shown in the stub below:

class GSISSHJobService (rs.cpi.job.Service) :

  def __init__ (self, api, adaptor) :
    rs.cpi.Base.__init__ (self, api, adaptor, 'GSISSHJobService')

  def init_instance (self, rm_url, session) :
    self._rm      = rm_url
    self._session = session

The radical.saga.cpi.Base class will make sure that the adaptor classes keep a self._adaptor member, pointing to the adaptor singleton instance (i.e. the module’s Adaptor class instance). It will further initialize a logging module (available as self._logger).

Note that the adaptor class’ __init__ does not correspond to the API level object __init__ – instead, the adaptor class construction is a two step process, and the actual constructor semantics is mapped to an init_instance() method, which receives the API level constructor arguments.

4.1.2. Adaptor Registration

Any SAGA adaptor must be registered in the :ref:Engine in order to be usable. That process is very simple, and performed by the radical.saga.cpi.AdaptorBase class – so all the adaptor has to take care of is the correct initialization of that base class, as described in Adaptor Structure. The AdaptorBase will forward the _ADAPTOR_INFO to the radical.saga.engine.Engine class, where the adaptor will be added to a registry of adaptor classes, hierarchically sorted like this (simplified):

Engine._adaptor_registry =
  'rs.job.Service' :
    'gshiss' : [rs.adaptors.gsissh.job.GSISSHJobService, ...]
    'ssh'    : [rs.adaptors.gsissh.job.GSISSHJobService, ...]
    'gram'   : [rs.adaptors.globus.job.GRAMJobService, ...]
  'rs.job.Job' :
    'gshiss' : [rs.adaptors.gsissh.job.GSISSHJob, ...]
    'ssh'    : [rs.adaptors.gsissh.job.GSISSHJob, ...]
    'gram'   : [rs.adaptors.globus.job.GRAMJob, ...]

That registry is searched when the engine binds an adaptor class instance to a SAGA API object instance – see Adaptor Binding.

4.1.3. Adaptor Binding

Whenever a SAGA API object is created, or whenever any method is invoked on that object, the RADICAL-SAGA implementation needs to (a) select a suitable backend adaptor to perform that operation, and (b) invoke the respective adaptor functionality.

The first part, selecting a specific adaptor for a specific API object instance, is called binding – RADICAL-SAGA binds an adaptor to an object exactly once, at creation time, and the bond remains valid for the lifetime of the API object: on API creation, the API object will request a suitable adaptor from the engine, and will keep it for further method invocations (code simplified):

class Service (object) :

  def __init__ (self, url=None, session=None) :
    self._engine  = getEngine ()
    self._adaptor = self._engine.get_adaptor (self, 'rs.job.Service', url.scheme, ...,
                                                url, session)

  def run_job (self, cmd, host="", ttype=None) :
    return self._adaptor.run_job (cmd, host, ttype=ttype)


The Engine.get_adaptor call will iterate through the engine’s adaptor registry, and will, for all adaptors which indicated support for the given URL scheme, request an adaptor class instance for the given API class. If an adaptor class instance can successfully be created, the engine will further attempt to call the adaptor class’ init_instance method, which will in fact construct an adaptor level representation of the API level object:

class GSISSHJobService (rs.cpi.job.Service) :

  def __init__ (self, api, adaptor) :
    rs.cpi.Base.__init__ (self, api, adaptor, 'GSISSHJobService')

  def init_instance (self, url, session) :
    # - check if session contains suitable security tokens for (gsi)ssh
    # - check if endpoint given by 'url' can be served
    # - establish and cache connection to that endpoint, with the sesssion
    #   credentials

If either the adaptor class instantiation or the init_instance invocation raise an exception, the engine will try the next registered adaptor for that API class / url scheme combo. If both steps are successful, the adaptor class instance will be returned to the API object’s constructor, as shown above.

4.1.4. Adaptor State

Instances of adaptor classes will often need to share some state. For example, different instances of rs.job.Job running via ssh on a specific host may want to share a single ssh connection; asynchronous operations on a specific adaptor may want to share a thread pool; adaptor class instances of a specific resource adaptor may want to share a notification endpoint. State sharing supports scalability, and can simplify adaptor code – but also adds some overhead to exchange and synchronize state between those adaptor class instances.

The preferred way to share state is to use the adaptor instance (as it was created by the engine while loading the adaptor’s module) for state exchange (see section Adaptor Registration – all adaptor class instances get the spawning adaptor instance passed at creation time:

class GSISSHJobService (rs.cpi.job.Service) :

  def __init__ (self, api, adaptor) :
    rs.cpi.Base.__init__ (self, api, adaptor, 'GSISSHJobService')

radical.saga.cpi.Base will make that instance available as self._adaptor. As that adaptor class is part of the adaptor modules code base, and thus under full control of the adaptor developer, it is straight forward to use it for state caching and state exchange. Based on the example code in section Adaptor Structure, a connection caching adaptor class could look like this:

class Adaptor (rs.cpi.base.AdaptorBase):

  __metaclass__ = Singleton

  def __init__ (self) :
    rs.cpi.AdaptorBase.__init__ (self, _ADAPTOR_INFO, _ADAPTOR_OPTIONS)
    self._cache = {}

class GSISSHJobService (rs.cpi.job.Service) :

  def __init__ (self, api, adaptor) :
    rs.cpi.Base.__init__ (self, api, adaptor, 'GSISSHJobService')
    self._cache = self._adaptor._cache

  def init_instance (self, rm_url, session) :
    if not self._rm in self._adaptor.keys () :
      self._cache [self._rm] = setup_connection (self._rm)

  def run_job (self, cmd) :
    connection = self._cache [self._rm]
    return connection.run (cmd)

The adaptor implementor is responsible for the consistency of the shared state, and may need to use locking to ensure proper consistency in multithreaded environments – the self._adaptor class merely provides a shared container for the data, nothing else. Also, the Adaptor class’ destructor should take care of freeing the cached / shared state objects (unless another cleanup mechanism is in place).

4.1.5. Creating API Objects

Several SAGA API objects feature factory-like methods – examples are Directory.open(), job.Service.create_job()/run_job(), and resource.Manager.aquire(). To correctly implement those methods on adaptor level, adaptors need to be able to instantiate the API objects to return. We have seen in section Adaptor Binding that, on API object creation, the Engine will select and bind a suitable adaptor to the object instance. In many cases, however, an implementation of a factory-like API method will want to make sure that the resulting API object is bound to the same adaptor instance as the spawning adaptor class instance itself. For that purpose, all API object constructors will accept two additional parameters: _adaptor (type: radical.saga.cpi.Base or derivative), and _adaptor_state (type: dict). This is also provided for API objects which normally have no public constructor at all:

class Job (rs.attributes.Attributes, rs.task.Async) :

  def __init__ (self, _adaptor=None, _adaptor_state={}) :

    if not _adaptor :
        raise rs.exceptions.IncorrectState ("rs.job.Job constructor is private")

    # bind to the given adaptor -- this will create the required adaptor
    # class.  We need to specify a schema for adaptor selection -- and
    # simply choose the first one the adaptor offers.
    engine         = getEngine ()
    adaptor_schema = _adaptor.get_schemas()[0]
    self._adaptor  = engine.bind_adaptor (self, 'rs.job.Job', adaptor_schema,
                                          rs.task.NOTASK, _adaptor, _adaptor_state)

As shown above, _adaptor and _adaptor_state are forwarded to the Engine’s bind_adaptor() method, and if present will ensure that the resulting API object is bound to the given adaptor. The _adaptor_state dict will be forwarded to the adaptor class level init_instance() call, and can be used to correctly initialize the state of the new adaptor class. An example of adaptor level code for creating an radical.saga.job.Job instance via radical.saga.job.Service.create_job() is below:

class GSISSHJobService (rs.cpi.job.Service) :

  def __init__ (self, api, adaptor) :
      rs.cpi.Base.__init__ (self, api, adaptor, 'GSISSHJobService')

  def init_instance (self, rm_url, session) :

  def create_job (self, jd) :

    state = { 'job_service'     : self,
              'job_description' : jd,
              'session'         : self._session}

    return rs.job.Job (_adaptor=self._adaptor, _adaptor_state=state)

class GSISSHJob (rs.cpi.job.Job) :
  def __init__ (self, api, adaptor) :
    rs.cpi.Base.__init__ (self, api, adaptor, 'GSISSHJob')

  def init_instance (self, job_info):

    self._session        = job_info['session']
    self._jd             = job_info['job_description']
    self._parent_service = job_info['job_service']

    self._id             = None # is assigned when calling job.run()
    self._state          = rs.job.NEW

    # register ourselves with the parent service
    self._parent_service._update_jobid (self, self._id)

4.1.6. Exception Handling

RADICAL-SAGA defines a set of exceptions which can be thrown on the various method invocations (see section api_exceptions. Adaptor implementors must ensure, that the correct exception types are thrown on the corresponding error conditions. If the API layer encounters a non-SAGA exception from the adaptor layer, it will convert it to a rs.NoSuccess exception. While that will reliably shield the user layer from non-SAGA exception types, it is a lossy translation, and likely to hide the underlying cause of the error. This feature is thus to be considered as a safe guard, not as a preferred method of error state communication!

An example of adaptor level error handling is below:

class ContextX509 (rs.cpi.Context) :

  def __init__ (self, api, adaptor) :
    rs.cpi.Base.__init__ (self, api, adaptor, 'ContextX509')

  def init_instance (self, type) :
    if not type.lower () in (schema.lower() for schema in _ADAPTOR_SCHEMAS) :
      raise rs.exceptions.BadParameter \
              ("the x509 context adaptor only handles x509 contexts - duh!")

  def _initialize (self, session) :

    if not self._api.user_proxy :
      self._api.user_proxy = "x509up_u%d"  %  os.getuid()   # fallback

    if not os.path.exists (self._api.user_proxy) or \
       not os.path.isfile (self._api.user_proxy)    :
      raise rs.exceptions.BadParameter ("X509 proxy does not exist: %s"
                                               % self._api.user_proxy)

    try :
      fh = open (self._api.user_proxy)
    except Exception as e:
      raise rs.exceptions.PermissionDenied ("X509 proxy '%s' not readable: %s"
                                           % (self._api.user_proxy, str(e)))
    else :
      fh.close ()

4.1.7. Asynchronous Methods

The SAGA API features several objects which implement both synchronous and asynchronous versions of their respective methods. Synchronous calls will return normal objects or values; asynchronous calls will return radical.saga.Task instances, which represent the ongoing asynchronous method, and can later be inspected for state and return values.

On adaptor level, both method types are differences by the method decorators @SYNC and @ASYNC, like this:

class LocalFile (rs.cpi.filesystem.File) :

  def __init__ (self, api, adaptor) :
      rs.cpi.Base.__init__ (self, api, adaptor, 'LocalFile')

  def init_instance (self, url, flags, session) :
      self._url     = url
      self._flags   = flags
      self._session = session

  def init_instance_async (self, ttype, url, flags, session) :
    self._url     = url
    self._flags   = flags
    self._session = session

    t = rs.task.Task ()
    t._set_result (rs.filesystem.File (url, flags, session, _adaptor_name=_ADAPTOR_NAME))
    t._set_state  (rs.task.DONE)

    return t

  def get_url (self) :
    return self._url

  def get_url_async (self, ttype) :

    t = rs.task.Task ()
    t._set_result (self._url)
    t._set_state  (rs.task.DONE)

    return t

Note that the async calls in the example code above are not really asynchronous, as they both return a task which is in Done state – a proper async call would return a task in New or Running state, without setting the task’s result, and would perform some required work in a separate thread or process. Upon completion, the adaptor (which should keep a handle on the created task) would then set the result and state of the task, thus notifying the application of the completion of the asynchronous method call.

Also note that there exists an asynchronous version for the init_instance() method, which is used for the asynchronous API object creation, i.e. on:

#import sys
#import radical.saga as rs

t = rs.job.Service.create ('ssh://host.net')

t.wait ()

if t.state != rs.task.DONE :
  print "no job service: " + str(t.exception)
  sys.exit (0)

job_service = t.get_result ()
job_service.run_job ("touch /tmp/hello_world")

The exact semantics of SAGA’s asynchronous operations is described elsewhere (see section api_tasks). Relevant for this discussion is to note that the asynchronous adaptor methods all receive a task type parameter (ttype) which determines the state of the task to return: on ttype==rs.task.TASK, the returned task should be in New state; on ttype==rs.task.ASYNC the task is in Running state, and on ttype==rs.task.SYNC the returned task is already Done. It is up to the adaptor implementor to ensure that semantics – the examples above do not follow it, and are thus incomplete.

4.1.8. Bulk Operations:

On API level, there exists no explicit support for bulk operations. Those can, however, be rendered implicitly, by collecting asynchronous operations in a task container, and calling run() on that container:

bulk  = rs.task.Container ()

dir_1 = rs.filesystem.Directory ("gridftp://remote.host1.net/")
for i in ranger (0, 1000) :

  src = "gridftp://remote.host1.net/data/file_%4d.dat"  %  i
  tgt = "gridftp://other.hostx.net/share/file_%4d.dat"  %  i

  bulk.add (dir1.copy (src, tgt, rs.task.TASK))

dir_2 = rs.filesystem.Directory ("ssh://remote.host2.net/")
for i in ranger (0, 1000) :

  src = "ssh://remote.host2.net/data/file_%4d.info"  %  i
  tgt = "ssh://other.hostx.net/share/file_%4d.info"  %  i

  bulk.add (dir2.copy (src, tgt, rs.task.TASK))

bulk.run  ()
bulk.wait (rs.task.ALL)

The above task container gets filled by file copy tasks which are addressing two different file transfer protocols, and are thus likely mapped to two different adaptors. The RADICAL-SAGA API implementation will inspect the task container upon bulk.run(), and will attempt to sort the contained tasks by adaptor, i.e. all tasks operating on API objects which bind to the same adaptor instance (not adaptor class instance) will be lumped together into a task bucket. For each bucket, the API will then call the respective bulk operation (container_method) for that adaptor.

Note that at this point, the task container implementation does not yet know what adaptor class instance to use for the bulk operations. For that purpose, it will inspect

todo:Needs completion after generic bulk ops are fixed.

4.1.9. Adaptor Logging

Based on Python’s logging facility, RADICAL-SAGA also supports logging, both as an internal auditing and debugging mechanism, and as application supporting capability (see section util_logging. Adaptor implementors are encouraged to use logging as well – for that purposed, the radical.saga.cpi.AdaptorBase and radical.saga.cpi.Base classes will initialize a self._logger member for all adaptor and adaptor class implementations, respectively.

We advice to use the log levels as indicated below:

Log Level Type of Events Logged
CRITICAL Only fatal events that will cause the process to abort – that NEVER happen on adaptor level!
ERROR Events that will prevent the adaptor from functioning correctly.
WARNING Events that indicate potential problems or unusual events, and can support application diagnostics.
INFO Events that support adaptor auditing, inform about backend activities, performance etc.
DEBUG Events which support the tracking of adaptor code paths, to support adaptor debugging (lots of output).

4.1.10. External Processes

For many adaptor implementations, it is beneficial to interface to external processes. In particular, all adaptors performing remote interactions over ssh or gsissh secured connections will likely need to interact with the remote user shell. The classes radical.saga.utils.pty_process.PTYProcess and (on top of PTYProcess) radical.saga.utils.pty_shell.PTYShell are designed to simplify those interactions, and at the same time be more performant than, for example, pexpect.

class radical.saga.utils.pty_shell.PTYShell(url, session=None, logger=None, cfg=None, posix=True, interactive=True)[source]

Bases: object

This class wraps a shell process and runs it as a PTYProcess. The user of this class can start that shell, and run arbitrary commands on it.

The shell to be run is expected to be POSIX compliant (bash, dash, sh, ksh etc.) – in particular, we expect the following features: $?, $!, $#, $*, $@, $$, $PPID, >&, >>, >, <, |, ||, (), &, &&, wait, kill, nohup, shift, export, PS1, and PS2.

Note that PTYShell will change the shell prompts (PS1 and PS2), to simplify output parsing. PS2 will be empty, PS1 will be set PROMPT-$?-> – that way, the prompt will report the exit value of the last command, saving an extra roundtrip. Users of this class should be careful when setting other prompts – see set_prompt() for more details.

Usage Example:

# start the shell, find its prompt.
self.shell = saga.utils.pty_shell.PTYShell("ssh://user@rem.host.net/",
                                           contexts, self._logger)

# run a simple shell command, merge stderr with stdout.  $$ is the pid
# of the shell instance.
ret, out, _ = self.shell.run_sync (" mkdir -p /tmp/data.$$/" )

# check if mkdir reported success
if  ret != 0 :
    raise saga.NoSuccess ("failed to prepare dir (%s)(%s)" % (ret, out))

# stage some data from a local string variable
# into a file on the remote system
self.shell.stage_to_remote (src = pbs_job_script,
                            tgt = "/tmp/data.$$/job_1.pbs")

# check size of staged script (this is actually done on PTYShell level
# already, with no extra hop):
ret, out, _ = self.shell.run_sync("stat -c '%s' /tmp/data.$$/job_1.pbs")
if  ret != 0 :
    raise saga.NoSuccess ("failed to check size (%s)(%s)" % (ret, out))

assert (len(pbs_job_script) == int(out))

Data Staging and Data Management:

The PTYShell class does not only support command execution, but also basic data management: for SSH based shells, it will create a tunneled scp/sftp connection for file staging. Other data management operations (mkdir, size, list, …) are executed either as shell commands, or on the scp/sftp channel (if possible on the data channel, to keep the shell pty free for concurrent command execution). Ssh tunneling is implemented via ssh.v2 ‘ControlMaster’ capabilities (see ssh_config(5)).

For local shells, PTYShell will create an additional shell pty for data management operations.

Asynchronous Notifications:

A third pty process will be created for asynchronous notifications. For that purpose, the shell started on the first channel will create a named pipe, at:


$$ here represents the pid of the shell process. It will also set the environment variable SAGA_ASYNC_PIPE to point to that named pipe – any application running on the remote host can write event messages to that pipe, which will be available on the local end (see below). PTYShell leaves it unspecified what format those messages have, but messages are expected to be separated by newlines.

An adaptor using PTYShell can subscribe for messages via:

self.pty_shell.subscribe (callback)

where callback is a Python callable. PTYShell will listen on the event channel in a separate thread and invoke that callback on any received message, passing the message text (sans newline) to the callback.

An example usage: the command channel may run the following command line:

( sh -c 'sleep 100 && echo "job $$ done" > $SAGA_ASYNC_PIPE"                          || echo "job $$ fail" > $SAGA_ASYNC_PIPE" ) &

which will return immediately, and send a notification message at job completion.

Note that writes to named pipes are not atomic. From POSIX:

``A write is atomic if the whole amount written in one operation is not interleaved with data from any other process. This is useful when there are multiple writers sending data to a single reader. Applications need to know how large a write request can be expected to be performed atomically. This maximum is called {PIPE_BUF}. This volume of IEEE Std 1003.1-2001 does not say whether write requests for more than {PIPE_BUF} bytes are atomic, but requires that writes of {PIPE_BUF} or fewer bytes shall be atomic.`

Thus the user is responsible for ensuring that either messages are smaller than PIPE_BUF bytes on the remote system (usually at least 1024, on Linux usually 4096), or to lock the pipe on larger writes.

Automated Restart, Timeouts:

For timeout and restart semantics, please see the documentation to the underlying saga.utils.pty_process.PTYProcess class.


initialize the shell connection.


The shell is assumed to be alive if the shell processes lives. Attempt to restart shell if recover==True


If run_async was called, a command is running on the shell. find_prompt can be used to collect its output up to the point where the shell prompt re-appears (i.e. when the command finishes).

Note that this method blocks until the command finishes. Future versions of this call may add a timeout parameter.

find(patterns, timeout=-1)[source]

Note that this method blocks until pattern is found in the shell I/O.

Parameters:new_prompt (string) – a regular expression matching the shell prompt

The new_prompt regex is expected to be a regular expression with one set of catching brackets, which MUST return the previous command’s exit status. This method will send a newline to the client, and expects to find the prompt with the exit value ‘0’.

As a side effect, this method will discard all previous data on the pty, thus effectively flushing the pty output.

By encoding the exit value in the command prompt, we safe one roundtrip. The prompt on Posix compliant shells can be set, for example, via:

PS1='PROMPT-$?->'; export PS1

The newline in the example above allows to nicely anchor the regular expression, which would look like:


The regex is compiled with ‘re.DOTALL’, so the dot character matches all characters, including line breaks. Be careful not to match more than the exact prompt – otherwise, a prompt search will swallow stdout data. For example, the following regex:


would capture arbitrary strings, and would thus match all of:

data/ info

and thus swallow the ls output…

Note that the string match before the prompt regex is non-gready – if the output contains multiple occurrences of the prompt, only the match up to the first occurence is returned.

run_sync(command, iomode=None, new_prompt=None)[source]

Run a shell command, and report exit code, stdout and stderr (all three will be returned in a tuple). The call will block until the command finishes (more exactly, until we find the prompt again on the shell’s I/O stream), and cannot be interrupted.

  • command (string) – shell command to run.
  • iomode (enum) – Defines how stdout and stderr are captured.
  • new_prompt (string) – regular expression matching the prompt after

command succeeded.

We expect the command to not to do stdio redirection, as this is we want to capture that separately. We do allow pipes and stdin/stdout redirection. Note that SEPARATE mode will break if the job is run in the background

The following iomode values are valid:

  • IGNORE: both stdout and stderr are discarded, None will be
    returned for each.
  • MERGED: both streams will be merged and returned as stdout;
    stderr will be None. This is the default.
  • SEPARATE: stdout and stderr will be captured separately, and
    returned individually. Note that this will require at least one more network hop!
  • STDOUT: only stdout is captured, stderr will be None.
  • STDERR: only stderr is captured, stdout will be None.
  • None: do not perform any redirection – this is effectively
    the same as MERGED

If any of the requested output streams does not return any data, an empty string is returned.

If the command to be run changes the prompt to be expected for the shell, the new_prompt parameter MUST contain a regex to match the new prompt. The same conventions as for set_prompt() hold – i.e. we expect the prompt regex to capture the exit status of the process.


Run a shell command, but don’t wait for prompt – just return. It is up to caller to eventually search for the prompt again (see find_prompt(). Meanwhile, the caller can interact with the called command, via the I/O channels.

Parameters:command (string) – shell command to run.

For async execution, we don’t care if the command is doing i/o redirection or not.


send data to the shell. No newline is appended!

write_to_remote(src, tgt)[source]
  • src (string) – data to be staged into the target file
  • tgt (string) – path to target file to staged to The tgt path is not an URL, but expected to be a path relative to the shell’s URL.

The content of the given string is pasted into a file (specified by tgt) on the remote system. If that file exists, it is overwritten. A NoSuccess exception is raised if writing the file was not possible (missing permissions, incorrect path, etc.).

Parameters:src (string) – path to source file to staged from The src path is not an URL, but expected to be a path relative to the shell’s URL.
stage_to_remote(src, tgt, cp_flags=None)[source]
  • src (string) – path of local source file to be stage from. The tgt path is not an URL, but expected to be a path relative to the current working directory.
  • tgt (string) – path to target file to stage to. The tgt path is not an URL, but expected to be a path relative to the shell’s URL.
stage_from_remote(src, tgt, cp_flags='')[source]
  • src (string) – path to source file to stage from. The tgt path is not an URL, but expected to be a path relative to the shell’s URL.
  • tgt (string) – path of local target file to stage to. The tgt path is not an URL, but expected to be a path relative to the current working directory.
run_copy_to(src, tgt, cp_flags=None)[source]

This initiates a slave copy connection. Src is interpreted as local path, tgt as path on the remote host.

Now, this is ugly when over sftp: sftp supports recursive copy, and wildcards, all right – but for recursive copies, it wants the target dir to exist – so, we have to check if the local src is a dir, and if so, we first create the target before the copy. Worse, for wildcards we have to do a local expansion, and then to do the same for each entry…

run_copy_from(src, tgt, cp_flags='')[source]

This initiates a slave copy connection. Src is interpreted as path on the remote host, tgt as local path.

We have to do the same mkdir trick as for the run_copy_to, but here we need to expand wildcards on the remote side :/

class radical.saga.utils.pty_process.PTYProcess(command, cfg='utils', logger=None)[source]

Bases: object

This class spawns a process, providing that child with pty I/O channels – it will maintain stdin, stdout and stderr channels to the child. All write-like operations operate on the stdin, all read-like operations operate on the stdout stream. Data from the stderr stream are at this point redirected to the stdout channel.


# run an interactive client process
pty = PTYProcess ("/usr/bin/ssh -t localhost")

# check client's I/O for one of the following patterns (prompts).
# Then search again.
n, match = pty.find (['password\s*:\s*$',
                      'want to continue connecting.*\(yes/no\)\s*$',

while True :

    if n == 0 :
        # found password prompt - tell the secret
        n, _ = pty.find (['password\s*:\s*$',
                          'want to continue connecting.*\(yes/no\)\s*$',
    elif n == 1 :
        # found request to accept host key - sure we do... (who checks
        # those keys anyways...?).  Then search again.
        n, _ = pty.find (['password\s*:\s*$',
                          'want to continue connecting.*\(yes/no\)\s*$',
    elif n == 2 :
        # found shell prompt!  Wohoo!

while True :
    # go full Dornroeschen (Sleeping Beauty)...
    pty.alive (recover=True) or break      # check / restart process
    pty.find  (['[\$#>]\s*$'])             # find shell prompt
    pty.write(b"/bin/sleep "100 years"\n") # sleep!  SLEEEP!

# something bad happened
print(pty.autopsy ())

kill the child, close all I/O channels


blocks forever until the child finishes on its own, or is getting killed.

Actully, we might just as well try to figure out what is going on on the remote end of things – so we read the pipe until the child dies…


try to determine if the child process is still active. If not, mark the child as dead and close all IO descriptors etc (“func:finalize).

If recover is True and the child is indeed dead, we attempt to re-initialize it (initialize()). We only do that for so many times (self.recover_max) before giving up – at that point it seems likely that the child exits due to a re-occurring operations condition.

Note that upstream consumers of the PTYProcess should be careful to only use recover=True when they can indeed handle a disconnected/reconnected client at that point, i.e. if there are no assumptions on persistent state beyond those in control of the upstream consumers themselves.


return diagnostics information string for dead child processes

read(size=0, timeout=0, _force=False)[source]

read some data from the child. By default, the method reads whatever is available on the next read, up to _CHUNKSIZE, but other read sizes can be specified.

The method will return whatever data it has at timeout:

timeout == 0 : return the content of the first successful read, with
               whatever data up to 'size' have been found.
timeout <  0 : return after first read attempt, even if no data have
               been available.

If no data are found, the method returns an empty string (not None).

This method will not fill the cache, but will just read whatever data it needs (FIXME).

Note: the returned lines do not get ‘\r’ stripped.

find(patterns, timeout=0)[source]

This methods reads bytes from the child process until a string matching any of the given patterns is found. If that is found, all read data are returned as a string, up to (and including) the match. Note that pattern can match an empty string, and the call then will return just that, an empty string. If all patterns end with matching a newline, this method is effectively matching lines – but note that ‘$’ will also match the end of the (currently available) data stream.

The call actually returns a tuple, containing the index of the matching pattern, and the string up to the match as described above.

If no pattern is found before timeout, the call returns (None, None). Negative timeouts will block until a match is found

Note that the pattern are interpreted with the re.M (multi-line) and re.S (dot matches all) regex flags.

Performance: the call is doing repeated string regex searches over whatever data it finds. On complex regexes, and large data, and small read buffers, this method can be expensive.

Note: the returned data get ‘\r’ stripped.

Note: ansi-escape sequences are also stripped before matching, but are kept in the returned data.

write(data, nolog=False)[source]

This method will repeatedly attempt to push the given data into the child’s stdin pipe, until it succeeds to write all data.