4.1. Writing RADICAL-SAGA Adaptors¶
Note
This part of the RADICAL-SAGA documentation is not for users of RADICAL-SAGA, but rather for implementors of backend adaptors (although it may be beneficial for users to skim over section Adaptor Binding to gain some insight into RADICAL-SAGA’s mode of operation).
4.1.1. Adaptor Structure¶
A RADICAL-SAGA adaptor is a Python module with well defined structure. The
module must expose a class Adaptor
, which (a) must be a singleton, (b) must
provide a sanity_check()
method, and (c) must inherit from
radical.saga.cpi.AdaptorBase
. That base class’ constructor (__init__
)
must be called like this:
class Adaptor (rs.cpi.AdaptorBase):
__metaclass__ = Singleton
def __init__ (self) :
rs.cpi.AdaptorBase.__init__ (self, _ADAPTOR_INFO, _ADAPTOR_OPTIONS)
def sanity_check (self) :
# FIXME: detect gsissh tool
pass
_ADAPTOR_INFO
and _ADAPTOR_OPTIONS
are module level Python dict
s with the
following layout:
_ADAPTOR_NAME = 'rs.adaptor.gsissh.job'
_ADAPTOR_SCHEMAS = ['ssh', 'gsissh']
_ADAPTOR_OPTIONS = [
{
'category' : _ADAPTOR_NAME,
'name' : 'cache_connection',
'type' : bool,
'default' : True,
'valid_options' : [True, False],
'documentation' : 'toggle connection caching',
'env_variable' : None
},
]
_ADAPTOR_INFO = {
'name' : _ADAPTOR_NAME,
'version' : 'v0.1',
'cpis' : [
{
'type' : 'rs.job.Service',
'class' : 'GSISSHJobService',
'schemas' : _ADAPTOR_SCHEMAS
},
{
'type' : 'rs.job.Job',
'class' : 'GSISSHJob',
'schemas' : _ADAPTOR_SCHEMAS
}
]
}
(It is beneficial to specify _ADAPTOR_NAME
and _ADAPTOR_SCHEMAS
separately, as they are used in multiple places, as explained later on.)
The adaptor classes listed in the _ADAPTOR_INFO
(in this example,
GSISSHJob
and GSISSHJobService
) are the classes which are actually bound
to a SAGA API object, and provide its functionality. For that purpose, those
classes must inherit the respective object’s Capability Provider Interface
(CPI), as shown in the stub below:
class GSISSHJobService (rs.cpi.job.Service) :
def __init__ (self, api, adaptor) :
rs.cpi.Base.__init__ (self, api, adaptor, 'GSISSHJobService')
@SYNC
def init_instance (self, rm_url, session) :
self._rm = rm_url
self._session = session
The radical.saga.cpi.Base
class will make sure that the adaptor classes keep
a self._adaptor
member, pointing to the adaptor singleton instance (i.e. the
module’s Adaptor
class instance). It will further initialize a logging module
(available as self._logger
).
Note that the adaptor class’ __init__
does not correspond to the API level
object __init__
– instead, the adaptor class construction is a two step
process, and the actual constructor semantics is mapped to an
init_instance()
method, which receives the API level constructor arguments.
4.1.2. Adaptor Registration¶
Any SAGA adaptor must be registered in the :ref:Engine
in order to be
usable. That process is very simple, and performed by the
radical.saga.cpi.AdaptorBase
class – so all the adaptor has to take care
of is the correct initialization of that base class, as described in
Adaptor Structure. The AdaptorBase
will forward the
_ADAPTOR_INFO
to the radical.saga.engine.Engine
class, where the adaptor
will be added to a registry of adaptor classes, hierarchically sorted like
this (simplified):
Engine._adaptor_registry =
{
'rs.job.Service' :
{
'gshiss' : [rs.adaptors.gsissh.job.GSISSHJobService, ...]
'ssh' : [rs.adaptors.gsissh.job.GSISSHJobService, ...]
'gram' : [rs.adaptors.globus.job.GRAMJobService, ...]
...
},
'rs.job.Job' :
{
'gshiss' : [rs.adaptors.gsissh.job.GSISSHJob, ...]
'ssh' : [rs.adaptors.gsissh.job.GSISSHJob, ...]
'gram' : [rs.adaptors.globus.job.GRAMJob, ...]
...
},
...
}
That registry is searched when the engine binds an adaptor class instance to a SAGA API object instance – see Adaptor Binding.
4.1.3. Adaptor Binding¶
Whenever a SAGA API object is created, or whenever any method is invoked on that object, the RADICAL-SAGA implementation needs to (a) select a suitable backend adaptor to perform that operation, and (b) invoke the respective adaptor functionality.
The first part, selecting a specific adaptor for a specific API object instance, is called binding – RADICAL-SAGA binds an adaptor to an object exactly once, at creation time, and the bond remains valid for the lifetime of the API object: on API creation, the API object will request a suitable adaptor from the engine, and will keep it for further method invocations (code simplified):
class Service (object) :
def __init__ (self, url=None, session=None) :
self._engine = getEngine ()
self._adaptor = self._engine.get_adaptor (self, 'rs.job.Service', url.scheme, ...,
url, session)
def run_job (self, cmd, host="", ttype=None) :
return self._adaptor.run_job (cmd, host, ttype=ttype)
...
The Engine.get_adaptor
call will iterate through the engine’s adaptor
registry, and will, for all adaptors which indicated support for the given URL
scheme, request an adaptor class instance for the given API class. If an
adaptor class instance can successfully be created, the engine will further
attempt to call the adaptor class’ init_instance
method, which will in fact
construct an adaptor level representation of the API level object:
class GSISSHJobService (rs.cpi.job.Service) :
def __init__ (self, api, adaptor) :
rs.cpi.Base.__init__ (self, api, adaptor, 'GSISSHJobService')
def init_instance (self, url, session) :
# - check if session contains suitable security tokens for (gsi)ssh
# - check if endpoint given by 'url' can be served
# - establish and cache connection to that endpoint, with the sesssion
# credentials
...
If either the adaptor class instantiation or the init_instance
invocation
raise an exception, the engine will try the next registered adaptor for that API
class / url scheme combo. If both steps are successful, the adaptor class
instance will be returned to the API object’s constructor, as shown above.
4.1.4. Adaptor State¶
Instances of adaptor classes will often need to share some state. For example,
different instances of rs.job.Job
running via ssh on a specific host may
want to share a single ssh connection; asynchronous operations on a specific
adaptor may want to share a thread pool; adaptor class instances of a specific
resource adaptor may want to share a notification endpoint. State sharing
supports scalability, and can simplify adaptor code – but also adds some
overhead to exchange and synchronize state between those adaptor class
instances.
The preferred way to share state is to use the adaptor instance (as it was created by the engine while loading the adaptor’s module) for state exchange (see section Adaptor Registration – all adaptor class instances get the spawning adaptor instance passed at creation time:
class GSISSHJobService (rs.cpi.job.Service) :
def __init__ (self, api, adaptor) :
rs.cpi.Base.__init__ (self, api, adaptor, 'GSISSHJobService')
radical.saga.cpi.Base
will make that instance available as self._adaptor
.
As that adaptor class is part of the adaptor modules code base, and thus under
full control of the adaptor developer, it is straight forward to use it for
state caching and state exchange. Based on the example code in section
Adaptor Structure, a connection caching adaptor class could look like
this:
class Adaptor (rs.cpi.base.AdaptorBase):
__metaclass__ = Singleton
def __init__ (self) :
rs.cpi.AdaptorBase.__init__ (self, _ADAPTOR_INFO, _ADAPTOR_OPTIONS)
self._cache = {}
...
...
class GSISSHJobService (rs.cpi.job.Service) :
def __init__ (self, api, adaptor) :
...
rs.cpi.Base.__init__ (self, api, adaptor, 'GSISSHJobService')
self._cache = self._adaptor._cache
@SYNC
def init_instance (self, rm_url, session) :
...
if not self._rm in self._adaptor.keys () :
self._cache [self._rm] = setup_connection (self._rm)
@SYNC
def run_job (self, cmd) :
...
connection = self._cache [self._rm]
return connection.run (cmd)
...
The adaptor implementor is responsible for the consistency of the shared state,
and may need to use locking to ensure proper consistency in multithreaded
environments – the self._adaptor
class merely provides a shared container
for the data, nothing else. Also, the Adaptor class’ destructor should take
care of freeing the cached / shared state objects (unless another cleanup
mechanism is in place).
4.1.5. Creating API Objects¶
Several SAGA API objects feature factory-like methods – examples are
Directory.open()
, job.Service.create_job()/run_job()
, and
resource.Manager.aquire()
. To correctly implement those methods on adaptor
level, adaptors need to be able to instantiate the API objects to return. We
have seen in section Adaptor Binding that, on API object creation, the
Engine
will select and bind a suitable adaptor to the object instance. In
many cases, however, an implementation of a factory-like API method will want to
make sure that the resulting API object is bound to the same adaptor instance as
the spawning adaptor class instance itself. For that purpose, all API object
constructors will accept two additional parameters: _adaptor
(type:
radical.saga.cpi.Base
or derivative), and _adaptor_state
(type:
dict
). This is also provided for API objects which normally have no
public constructor at all:
class Job (rs.attributes.Attributes, rs.task.Async) :
def __init__ (self, _adaptor=None, _adaptor_state={}) :
if not _adaptor :
raise rs.exceptions.IncorrectState ("rs.job.Job constructor is private")
...
# bind to the given adaptor -- this will create the required adaptor
# class. We need to specify a schema for adaptor selection -- and
# simply choose the first one the adaptor offers.
engine = getEngine ()
adaptor_schema = _adaptor.get_schemas()[0]
self._adaptor = engine.bind_adaptor (self, 'rs.job.Job', adaptor_schema,
rs.task.NOTASK, _adaptor, _adaptor_state)
As shown above, _adaptor
and _adaptor_state
are forwarded to the
Engine’s bind_adaptor()
method, and if present will ensure that the
resulting API object is bound to the given adaptor. The _adaptor_state
dict
will be forwarded to the adaptor class level init_instance()
call, and can
be used to correctly initialize the state of the new adaptor class. An example
of adaptor level code for creating an radical.saga.job.Job
instance via
radical.saga.job.Service
.create_job()
is below:
class GSISSHJobService (rs.cpi.job.Service) :
def __init__ (self, api, adaptor) :
rs.cpi.Base.__init__ (self, api, adaptor, 'GSISSHJobService')
@SYNC
def init_instance (self, rm_url, session) :
...
@SYNC
def create_job (self, jd) :
state = { 'job_service' : self,
'job_description' : jd,
'session' : self._session}
return rs.job.Job (_adaptor=self._adaptor, _adaptor_state=state)
class GSISSHJob (rs.cpi.job.Job) :
def __init__ (self, api, adaptor) :
rs.cpi.Base.__init__ (self, api, adaptor, 'GSISSHJob')
...
@SYNC
def init_instance (self, job_info):
self._session = job_info['session']
self._jd = job_info['job_description']
self._parent_service = job_info['job_service']
self._id = None # is assigned when calling job.run()
self._state = rs.job.NEW
# register ourselves with the parent service
self._parent_service._update_jobid (self, self._id)
...
4.1.6. Exception Handling¶
RADICAL-SAGA defines a set of exceptions which can be thrown on the various
method invocations (see section api_exceptions. Adaptor implementors
must ensure, that the correct exception types are thrown on the corresponding
error conditions. If the API layer encounters a non-SAGA exception from the
adaptor layer, it will convert it to a rs.NoSuccess
exception. While that
will reliably shield the user layer from non-SAGA exception types, it is a lossy
translation, and likely to hide the underlying cause of the error. This feature
is thus to be considered as a safe guard, not as a preferred method of error
state communication!
An example of adaptor level error handling is below:
class ContextX509 (rs.cpi.Context) :
def __init__ (self, api, adaptor) :
rs.cpi.Base.__init__ (self, api, adaptor, 'ContextX509')
@SYNC
def init_instance (self, type) :
if not type.lower () in (schema.lower() for schema in _ADAPTOR_SCHEMAS) :
raise rs.exceptions.BadParameter \
("the x509 context adaptor only handles x509 contexts - duh!")
@SYNC
def _initialize (self, session) :
if not self._api.user_proxy :
self._api.user_proxy = "x509up_u%d" % os.getuid() # fallback
if not os.path.exists (self._api.user_proxy) or \
not os.path.isfile (self._api.user_proxy) :
raise rs.exceptions.BadParameter ("X509 proxy does not exist: %s"
% self._api.user_proxy)
try :
fh = open (self._api.user_proxy)
except Exception as e:
raise rs.exceptions.PermissionDenied ("X509 proxy '%s' not readable: %s"
% (self._api.user_proxy, str(e)))
else :
fh.close ()
4.1.7. Asynchronous Methods¶
The SAGA API features several objects which implement both synchronous and
asynchronous versions of their respective methods. Synchronous calls will
return normal objects or values; asynchronous calls will return
radical.saga.Task
instances, which represent the ongoing asynchronous method,
and can later be inspected for state and return values.
On adaptor level, both method types are differences by the method decorators
@SYNC
and @ASYNC
, like this:
class LocalFile (rs.cpi.filesystem.File) :
def __init__ (self, api, adaptor) :
rs.cpi.Base.__init__ (self, api, adaptor, 'LocalFile')
@SYNC
def init_instance (self, url, flags, session) :
self._url = url
self._flags = flags
self._session = session
...
@ASYNC
def init_instance_async (self, ttype, url, flags, session) :
self._url = url
self._flags = flags
self._session = session
t = rs.task.Task ()
t._set_result (rs.filesystem.File (url, flags, session, _adaptor_name=_ADAPTOR_NAME))
t._set_state (rs.task.DONE)
return t
@SYNC
def get_url (self) :
return self._url
@ASYNC
def get_url_async (self, ttype) :
t = rs.task.Task ()
t._set_result (self._url)
t._set_state (rs.task.DONE)
return t
Note that the async calls in the example code above are not really
asynchronous, as they both return a task which is in Done
state – a proper
async call would return a task in New
or Running
state, without setting
the task’s result, and would perform some required work in a separate thread or
process. Upon completion, the adaptor (which should keep a handle on the
created task) would then set the result and state of the task, thus notifying
the application of the completion of the asynchronous method call.
Also note that there exists an asynchronous version for the init_instance()
method, which is used for the asynchronous API object creation, i.e. on:
#import sys
#import radical.saga as rs
t = rs.job.Service.create ('ssh://host.net')
t.wait ()
if t.state != rs.task.DONE :
print "no job service: " + str(t.exception)
sys.exit (0)
job_service = t.get_result ()
job_service.run_job ("touch /tmp/hello_world")
The exact semantics of SAGA’s asynchronous operations is described elsewhere
(see section api_tasks). Relevant for this discussion is to note that
the asynchronous adaptor methods all receive a task type parameter (ttype
)
which determines the state of the task to return: on ttype==rs.task.TASK
,
the returned task should be in New
state; on ttype==rs.task.ASYNC
the
task is in Running
state, and on ttype==rs.task.SYNC
the returned task
is already Done
. It is up to the adaptor implementor to ensure that
semantics – the examples above do not follow it, and are thus incomplete.
4.1.8. Bulk Operations:¶
On API level, there exists no explicit support for bulk operations. Those can,
however, be rendered implicitly, by collecting asynchronous operations in a task
container, and calling run()
on that container:
bulk = rs.task.Container ()
dir_1 = rs.filesystem.Directory ("gridftp://remote.host1.net/")
for i in ranger (0, 1000) :
src = "gridftp://remote.host1.net/data/file_%4d.dat" % i
tgt = "gridftp://other.hostx.net/share/file_%4d.dat" % i
bulk.add (dir1.copy (src, tgt, rs.task.TASK))
dir_2 = rs.filesystem.Directory ("ssh://remote.host2.net/")
for i in ranger (0, 1000) :
src = "ssh://remote.host2.net/data/file_%4d.info" % i
tgt = "ssh://other.hostx.net/share/file_%4d.info" % i
bulk.add (dir2.copy (src, tgt, rs.task.TASK))
bulk.run ()
bulk.wait (rs.task.ALL)
The above task container gets filled by file copy tasks which are addressing two
different file transfer protocols, and are thus likely mapped to two different
adaptors. The RADICAL-SAGA API implementation will inspect the task container
upon bulk.run()
, and will attempt to sort the contained tasks by adaptor,
i.e. all tasks operating on API objects which bind to the same adaptor instance
(not adaptor class instance) will be lumped together into a task bucket. For
each bucket, the API will then call the respective bulk operation
(container_method
) for that adaptor.
Note that at this point, the task container implementation does not yet know what adaptor class instance to use for the bulk operations. For that purpose, it will inspect
todo: | Needs completion after generic bulk ops are fixed. |
---|
4.1.9. Adaptor Logging¶
Based on Python’s logging
facility, RADICAL-SAGA also supports logging, both
as an internal auditing and debugging mechanism, and as application supporting
capability (see section util_logging. Adaptor implementors are
encouraged to use logging as well – for that purposed, the
radical.saga.cpi.AdaptorBase
and radical.saga.cpi.Base
classes will initialize
a self._logger
member for all adaptor and adaptor class implementations,
respectively.
We advice to use the log levels as indicated below:
Log Level Type of Events Logged CRITICAL
Only fatal events that will cause the process to abort – that NEVER happen on adaptor level! ERROR
Events that will prevent the adaptor from functioning correctly. WARNING
Events that indicate potential problems or unusual events, and can support application diagnostics. INFO
Events that support adaptor auditing, inform about backend activities, performance etc. DEBUG
Events which support the tracking of adaptor code paths, to support adaptor debugging (lots of output).
4.1.10. External Processes¶
For many adaptor implementations, it is beneficial to interface to external
processes. In particular, all adaptors performing remote interactions over ssh
or gsissh secured connections will likely need to interact with the remote user
shell. The classes radical.saga.utils.pty_process.PTYProcess
and (on top of
PTYProcess) radical.saga.utils.pty_shell.PTYShell
are designed to simplify
those interactions, and at the same time be more performant than, for example,
pexpect.
-
class
radical.saga.utils.pty_shell.
PTYShell
(url, session=None, logger=None, cfg=None, posix=True, interactive=True)[source]¶ Bases:
object
This class wraps a shell process and runs it as a
PTYProcess
. The user of this class can start that shell, and run arbitrary commands on it.The shell to be run is expected to be POSIX compliant (bash, dash, sh, ksh etc.) – in particular, we expect the following features:
$?
,$!
,$#
,$*
,$@
,$$
,$PPID
,>&
,>>
,>
,<
,|
,||
,()
,&
,&&
,wait
,kill
,nohup
,shift
,export
,PS1
, andPS2
.Note that
PTYShell
will change the shell prompts (PS1
andPS2
), to simplify output parsing.PS2
will be empty,PS1
will be setPROMPT-$?->
– that way, the prompt will report the exit value of the last command, saving an extra roundtrip. Users of this class should be careful when setting other prompts – seeset_prompt()
for more details.Usage Example:
# start the shell, find its prompt. self.shell = saga.utils.pty_shell.PTYShell("ssh://user@rem.host.net/", contexts, self._logger) # run a simple shell command, merge stderr with stdout. $$ is the pid # of the shell instance. ret, out, _ = self.shell.run_sync (" mkdir -p /tmp/data.$$/" ) # check if mkdir reported success if ret != 0 : raise saga.NoSuccess ("failed to prepare dir (%s)(%s)" % (ret, out)) # stage some data from a local string variable # into a file on the remote system self.shell.stage_to_remote (src = pbs_job_script, tgt = "/tmp/data.$$/job_1.pbs") # check size of staged script (this is actually done on PTYShell level # already, with no extra hop): ret, out, _ = self.shell.run_sync("stat -c '%s' /tmp/data.$$/job_1.pbs") if ret != 0 : raise saga.NoSuccess ("failed to check size (%s)(%s)" % (ret, out)) assert (len(pbs_job_script) == int(out))
Data Staging and Data Management:
The PTYShell class does not only support command execution, but also basic data management: for SSH based shells, it will create a tunneled scp/sftp connection for file staging. Other data management operations (mkdir, size, list, …) are executed either as shell commands, or on the scp/sftp channel (if possible on the data channel, to keep the shell pty free for concurrent command execution). Ssh tunneling is implemented via ssh.v2 ‘ControlMaster’ capabilities (see ssh_config(5)).
For local shells, PTYShell will create an additional shell pty for data management operations.
Asynchronous Notifications:
A third pty process will be created for asynchronous notifications. For that purpose, the shell started on the first channel will create a named pipe, at:
$HOME/.radical/saga/adaptors/shell/async.$$
$$
here represents the pid of the shell process. It will also set the environment variableSAGA_ASYNC_PIPE
to point to that named pipe – any application running on the remote host can write event messages to that pipe, which will be available on the local end (see below). PTYShell leaves it unspecified what format those messages have, but messages are expected to be separated by newlines.An adaptor using PTYShell can subscribe for messages via:
self.pty_shell.subscribe (callback)
where callback is a Python callable. PTYShell will listen on the event channel in a separate thread and invoke that callback on any received message, passing the message text (sans newline) to the callback.
An example usage: the command channel may run the following command line:
( sh -c 'sleep 100 && echo "job $$ done" > $SAGA_ASYNC_PIPE" || echo "job $$ fail" > $SAGA_ASYNC_PIPE" ) &
which will return immediately, and send a notification message at job completion.
Note that writes to named pipes are not atomic. From POSIX:
``A write is atomic if the whole amount written in one operation is not interleaved with data from any other process. This is useful when there are multiple writers sending data to a single reader. Applications need to know how large a write request can be expected to be performed atomically. This maximum is called {PIPE_BUF}. This volume of IEEE Std 1003.1-2001 does not say whether write requests for more than {PIPE_BUF} bytes are atomic, but requires that writes of {PIPE_BUF} or fewer bytes shall be atomic.`
Thus the user is responsible for ensuring that either messages are smaller than PIPE_BUF bytes on the remote system (usually at least 1024, on Linux usually 4096), or to lock the pipe on larger writes.
Automated Restart, Timeouts:
For timeout and restart semantics, please see the documentation to the underlying
saga.utils.pty_process.PTYProcess
class.-
alive
(recover=False)[source]¶ The shell is assumed to be alive if the shell processes lives. Attempt to restart shell if recover==True
-
find_prompt
(timeout=2.0)[source]¶ If run_async was called, a command is running on the shell. find_prompt can be used to collect its output up to the point where the shell prompt re-appears (i.e. when the command finishes).
Note that this method blocks until the command finishes. Future versions of this call may add a timeout parameter.
-
find
(patterns, timeout=-1)[source]¶ Note that this method blocks until pattern is found in the shell I/O.
-
set_prompt
(new_prompt)[source]¶ Parameters: new_prompt (string) – a regular expression matching the shell prompt The new_prompt regex is expected to be a regular expression with one set of catching brackets, which MUST return the previous command’s exit status. This method will send a newline to the client, and expects to find the prompt with the exit value ‘0’.
As a side effect, this method will discard all previous data on the pty, thus effectively flushing the pty output.
By encoding the exit value in the command prompt, we safe one roundtrip. The prompt on Posix compliant shells can be set, for example, via:
PS1='PROMPT-$?->'; export PS1
The newline in the example above allows to nicely anchor the regular expression, which would look like:
PROMPT-(\d+)->$
The regex is compiled with ‘re.DOTALL’, so the dot character matches all characters, including line breaks. Be careful not to match more than the exact prompt – otherwise, a prompt search will swallow stdout data. For example, the following regex:
PROMPT-(.+)->$
would capture arbitrary strings, and would thus match all of:
PROMPT-0->ls data/ info PROMPT-0->
and thus swallow the ls output…
Note that the string match before the prompt regex is non-gready – if the output contains multiple occurrences of the prompt, only the match up to the first occurence is returned.
-
run_sync
(command, iomode=None, new_prompt=None)[source]¶ Run a shell command, and report exit code, stdout and stderr (all three will be returned in a tuple). The call will block until the command finishes (more exactly, until we find the prompt again on the shell’s I/O stream), and cannot be interrupted.
Parameters: - command (string) – shell command to run.
- iomode (enum) – Defines how stdout and stderr are captured.
- new_prompt (string) – regular expression matching the prompt after
command succeeded.
We expect the
command
to not to do stdio redirection, as this is we want to capture that separately. We do allow pipes and stdin/stdout redirection. Note that SEPARATE mode will break if the job is run in the backgroundThe following iomode values are valid:
- IGNORE: both stdout and stderr are discarded, None will be
- returned for each.
- MERGED: both streams will be merged and returned as stdout;
- stderr will be None. This is the default.
- SEPARATE: stdout and stderr will be captured separately, and
- returned individually. Note that this will require at least one more network hop!
- STDOUT: only stdout is captured, stderr will be None.
- STDERR: only stderr is captured, stdout will be None.
- None: do not perform any redirection – this is effectively
- the same as MERGED
If any of the requested output streams does not return any data, an empty string is returned.
If the command to be run changes the prompt to be expected for the shell, the
new_prompt
parameter MUST contain a regex to match the new prompt. The same conventions as for set_prompt() hold – i.e. we expect the prompt regex to capture the exit status of the process.
-
run_async
(command)[source]¶ Run a shell command, but don’t wait for prompt – just return. It is up to caller to eventually search for the prompt again (see
find_prompt()
. Meanwhile, the caller can interact with the called command, via the I/O channels.Parameters: command (string) – shell command to run. For async execution, we don’t care if the command is doing i/o redirection or not.
-
write_to_remote
(src, tgt)[source]¶ Parameters: - src (string) – data to be staged into the target file
- tgt (string) – path to target file to staged to The tgt path is not an URL, but expected to be a path relative to the shell’s URL.
The content of the given string is pasted into a file (specified by tgt) on the remote system. If that file exists, it is overwritten. A NoSuccess exception is raised if writing the file was not possible (missing permissions, incorrect path, etc.).
-
read_from_remote
(src)[source]¶ Parameters: src (string) – path to source file to staged from The src path is not an URL, but expected to be a path relative to the shell’s URL.
-
stage_to_remote
(src, tgt, cp_flags=None)[source]¶ Parameters: - src (string) – path of local source file to be stage from. The tgt path is not an URL, but expected to be a path relative to the current working directory.
- tgt (string) – path to target file to stage to. The tgt path is not an URL, but expected to be a path relative to the shell’s URL.
-
stage_from_remote
(src, tgt, cp_flags='')[source]¶ Parameters: - src (string) – path to source file to stage from. The tgt path is not an URL, but expected to be a path relative to the shell’s URL.
- tgt (string) – path of local target file to stage to. The tgt path is not an URL, but expected to be a path relative to the current working directory.
-
run_copy_to
(src, tgt, cp_flags=None)[source]¶ This initiates a slave copy connection. Src is interpreted as local path, tgt as path on the remote host.
Now, this is ugly when over sftp: sftp supports recursive copy, and wildcards, all right – but for recursive copies, it wants the target dir to exist – so, we have to check if the local src is a dir, and if so, we first create the target before the copy. Worse, for wildcards we have to do a local expansion, and then to do the same for each entry…
-
-
class
radical.saga.utils.pty_process.
PTYProcess
(command, cfg='utils', logger=None)[source]¶ Bases:
object
This class spawns a process, providing that child with pty I/O channels – it will maintain stdin, stdout and stderr channels to the child. All write-like operations operate on the stdin, all read-like operations operate on the stdout stream. Data from the stderr stream are at this point redirected to the stdout channel.
Example:
# run an interactive client process pty = PTYProcess ("/usr/bin/ssh -t localhost") # check client's I/O for one of the following patterns (prompts). # Then search again. n, match = pty.find (['password\s*:\s*$', 'want to continue connecting.*\(yes/no\)\s*$', '[\$#>]\s*$']) while True : if n == 0 : # found password prompt - tell the secret pty.write(b"secret\n") n, _ = pty.find (['password\s*:\s*$', 'want to continue connecting.*\(yes/no\)\s*$', '[\$#>]\s*$']) elif n == 1 : # found request to accept host key - sure we do... (who checks # those keys anyways...?). Then search again. pty.write(b"yes\n") n, _ = pty.find (['password\s*:\s*$', 'want to continue connecting.*\(yes/no\)\s*$', '[\$#>]\s*$']) elif n == 2 : # found shell prompt! Wohoo! break while True : # go full Dornroeschen (Sleeping Beauty)... pty.alive (recover=True) or break # check / restart process pty.find (['[\$#>]\s*$']) # find shell prompt pty.write(b"/bin/sleep "100 years"\n") # sleep! SLEEEP! # something bad happened print(pty.autopsy ())
-
wait
()[source]¶ blocks forever until the child finishes on its own, or is getting killed.
Actully, we might just as well try to figure out what is going on on the remote end of things – so we read the pipe until the child dies…
-
alive
(recover=False)[source]¶ try to determine if the child process is still active. If not, mark the child as dead and close all IO descriptors etc (“func:finalize).
If recover is True and the child is indeed dead, we attempt to re-initialize it (
initialize()
). We only do that for so many times (self.recover_max) before giving up – at that point it seems likely that the child exits due to a re-occurring operations condition.Note that upstream consumers of the
PTYProcess
should be careful to only use recover=True when they can indeed handle a disconnected/reconnected client at that point, i.e. if there are no assumptions on persistent state beyond those in control of the upstream consumers themselves.
-
read
(size=0, timeout=0, _force=False)[source]¶ read some data from the child. By default, the method reads whatever is available on the next read, up to _CHUNKSIZE, but other read sizes can be specified.
The method will return whatever data it has at timeout:
timeout == 0 : return the content of the first successful read, with whatever data up to 'size' have been found. timeout < 0 : return after first read attempt, even if no data have been available.
If no data are found, the method returns an empty string (not None).
This method will not fill the cache, but will just read whatever data it needs (FIXME).
Note: the returned lines do not get ‘\r’ stripped.
-
find
(patterns, timeout=0)[source]¶ This methods reads bytes from the child process until a string matching any of the given patterns is found. If that is found, all read data are returned as a string, up to (and including) the match. Note that pattern can match an empty string, and the call then will return just that, an empty string. If all patterns end with matching a newline, this method is effectively matching lines – but note that ‘$’ will also match the end of the (currently available) data stream.
The call actually returns a tuple, containing the index of the matching pattern, and the string up to the match as described above.
If no pattern is found before timeout, the call returns (None, None). Negative timeouts will block until a match is found
Note that the pattern are interpreted with the re.M (multi-line) and re.S (dot matches all) regex flags.
Performance: the call is doing repeated string regex searches over whatever data it finds. On complex regexes, and large data, and small read buffers, this method can be expensive.
Note: the returned data get ‘\r’ stripped.
Note: ansi-escape sequences are also stripped before matching, but are kept in the returned data.
-