Source code for radical.saga.utils.pty_process


__author__    = "Andre Merzky"
__copyright__ = "Copyright 2012-2013, The SAGA Project"
__license__   = "MIT"


import re
import os
import sys
import pty
import time
import errno
import shlex
import select
import signal
import termios
import threading as mt

import radical.utils            as ru

from   .  import pty_exceptions as ptye
from   .. import exceptions     as se


# --------------------------------------------------------------------
#
_CHUNKSIZE = 1024 * 1024  # default size of each read
_POLLDELAY = 0.01         # seconds in between read attempts
_DEBUG_MAX = 600


# --------------------------------------------------------------------
#
[docs]class PTYProcess (object) : """ This class spawns a process, providing that child with pty I/O channels -- it will maintain stdin, stdout and stderr channels to the child. All write-like operations operate on the stdin, all read-like operations operate on the stdout stream. Data from the stderr stream are at this point redirected to the stdout channel. Example:: # run an interactive client process pty = PTYProcess ("/usr/bin/ssh -t localhost") # check client's I/O for one of the following patterns (prompts). # Then search again. n, match = pty.find (['password\s*:\s*$', 'want to continue connecting.*\(yes/no\)\s*$', '[\$#>]\s*$']) while True : if n == 0 : # found password prompt - tell the secret pty.write(b"secret\\n") n, _ = pty.find (['password\s*:\s*$', 'want to continue connecting.*\(yes/no\)\s*$', '[\$#>]\s*$']) elif n == 1 : # found request to accept host key - sure we do... (who checks # those keys anyways...?). Then search again. pty.write(b"yes\\n") n, _ = pty.find (['password\s*:\s*$', 'want to continue connecting.*\(yes/no\)\s*$', '[\$#>]\s*$']) elif n == 2 : # found shell prompt! Wohoo! break while True : # go full Dornroeschen (Sleeping Beauty)... pty.alive (recover=True) or break # check / restart process pty.find (['[\$#>]\s*$']) # find shell prompt pty.write(b"/bin/sleep "100 years"\\n") # sleep! SLEEEP! # something bad happened print(pty.autopsy ()) """ # ---------------------------------------------------------------- # def __init__ (self, command, cfg='utils', logger=None) : """ The class constructor, which runs (execvpe) command in a separately forked process. The bew process will inherit the environment of the application process. :type command: string or list of strings :param command: The given command is what is run as a child, and fed/drained via pty pipes. If given as string, command is split into an array of strings, using :func:`shlex.split`. :type logger: :class:`radical.utils.logger.Logger` instance :param logger: logger stream to send status messages to. """ self.rlock = mt.RLock() self.logger = logger if not self.logger : self.logger = ru.Logger('radical.saga.pty') self.logger.debug ("PTYProcess init %s" % self) name = None if isinstance(cfg, str): name = cfg cfg = None self.cfg = ru.Config('radical.saga.session', name=name, cfg=cfg) self.cfg = self.cfg.pty if isinstance (command, str) : command = shlex.split (command) if not isinstance (command, list) : raise se.BadParameter ("PTYProcess expects string or list command") if len(command) < 1 : raise se.BadParameter ("PTYProcess expects non-empty command") self.command = command # list of strings too run() self.cache = "" # data cache self.tail = "" # tail of data data cache for error messages self.child = None # the process as created by subprocess.Popen self.ptyio = None # the process' io channel, from pty.fork() self.exit_code = None # child died with code (may be revived) self.exit_signal = None # child kill by signal (may be revived) self.recover_max = 3 # TODO: make configure option. This does not self.recover_attempts = 0 # apply for recovers triggered by gc_timeout! try : self.initialize () except Exception as e : raise ptye.translate_exception(e, "pty or process creation failed")\ from e # -------------------------------------------------------------------- # def __del__ (self) : """ Need to free pty's on destruction, otherwise we might ran out of them (see cat /proc/sys/kernel/pty/max) """ with self.rlock : try : self.finalize () except : pass # ---------------------------------------------------------------------- # def flush(self): # clean the *read* data cache -- only use when you know what you are # doing! if not self.parent_out: # nothing to read, yet, so there is nothing to cache, nor to flush return self.logger.debug("flush: [%5d] [ ] (flush pty read cache)", self.parent_out) # lets see if there are still things to read while True: try: tmp = self.read(timeout=0.1) if tmp : self.logger.warn("flush: [%5d] [%5d] (discard data : '%s')", self.parent_out, len(tmp), tmp) continue except: self.logger.warn('read error on flush') break if len(self.cache): self.logger.warn("flush: [%5d] [%5d] (discard cache: '%s')", self.parent_out, len(self.cache), self.cache) self.cache = "" # ---------------------------------------------------------------------- # def _hide_data (self, data, nolog=False) : if nolog : import re return re.sub (r'([^\n])', 'X', data) else : return data # ---------------------------------------------------------------------- # def initialize (self) : with self.rlock : # already initialized? if self.child : self.logger.warn ("init race: %s" % ' '.join (self.command)) return self.logger.info ("running: %s" % ' '.join (self.command)) # create the child try : self.child, self.child_fd = pty.fork () except Exception as e: raise se.NoSuccess ("Could not run (%s): %s" % (' '.join (self.command), e)) from e if not self.child : # this is the child try : # all I/O set up, have a pty (*fingers crossed*), lift-off! os.execvpe (self.command[0], self.command, os.environ) except OSError as e: self.logger.error ("Could not execute (%s): %s" % (' '.join (self.command), e)) sys.exit (-1) else : # this is the parent new = termios.tcgetattr (self.child_fd) new[3] = new[3] & ~termios.ECHO termios.tcsetattr (self.child_fd, termios.TCSANOW, new) self.parent_in = self.child_fd self.parent_out = self.child_fd # -------------------------------------------------------------------- # def finalize (self, wstat=None) : """ kill the child, close all I/O channels """ with self.rlock : # now we can safely kill the process # (unless some wait did that before) if wstat is None : if self.child : # yes, we have something to kill! try : os.kill (self.child, signal.SIGKILL) except OSError : pass # hey, kiddo, how did that go? max_tries = 10 tries = 0 while tries < max_tries : try : wpid, wstat = os.waitpid (self.child, os.WNOHANG) except OSError as e : # this should not have failed -- child disappeared? if e.errno == errno.ECHILD : self.exit_code = None self.exit_signal = None wstat = None break else : # other errors are bad, but there is not much to # be done at this point self.logger.warn("ignore waitpid error %s" % e) break if wpid : break time.sleep (0.1) tries += 1 # at this point, we declare the process to be gone for good self.child = None # lets see if we can perform some post-mortem analysis if wstat is not None : if os.WIFEXITED (wstat) : # child died of natural causes - perform autopsy... self.exit_code = os.WEXITSTATUS (wstat) self.exit_signal = None elif os.WIFSIGNALED (wstat) : # murder!! Child got killed by someone! recover evidence... self.exit_code = None self.exit_signal = os.WTERMSIG (wstat) try : if self.parent_out : os.close (self.parent_out) self.parent_out = None except OSError : pass # try : # if self.parent_in : # os.close (self.parent_in) # self.parent_in = None # except OSError : # pass # try : # os.close (self.parent_err) # except OSError : # pass # -------------------------------------------------------------------- # def wait (self) : """ blocks forever until the child finishes on its own, or is getting killed. Actully, we might just as well try to figure out what is going on on the remote end of things -- so we read the pipe until the child dies... """ output = "" # yes, for ever and ever... while True : try : output += self.read () except : break # yes, for ever and ever... while True : if not self.child: # this was quick ;-) # print("child is gone") return output # we need to lock, as the SIGCHLD will only arrive once with self.rlock : # hey, kiddo, whats up? try : wpid, wstat = os.waitpid (self.child, 0) # print("wait: %s -- %s" % (wpid, wstat)) except OSError as e : if e.errno == errno.ECHILD : # child disappeared self.exit_code = None self.exit_signal = None self.finalize () # print("no such child") return output # no idea what happened -- it is likely bad # print("waitpid failed") raise se.NoSuccess ("waitpid failed on wait") from e # did we get a note about child termination? if 0 == wpid : # nope, all is well - carry on continue # Yes, we got a note. # Well, maybe the child fooled us and is just playing dead? if os.WIFSTOPPED (wstat) or \ os.WIFCONTINUED (wstat) : # we don't care if someone stopped/resumed the child -- that # is up to higher powers. For our purposes, the child is # alive. Ha! continue # not stopped, poor thing... - soooo, what happened?? But hey, # either way, its dead -- make sure it stays dead, to avoid # zombie apocalypse... self.child = None self.finalize (wstat=wstat) return output # -------------------------------------------------------------------- # def alive (self, recover=False) : """ try to determine if the child process is still active. If not, mark the child as dead and close all IO descriptors etc ("func:`finalize`). If `recover` is `True` and the child is indeed dead, we attempt to re-initialize it (:func:`initialize`). We only do that for so many times (`self.recover_max`) before giving up -- at that point it seems likely that the child exits due to a re-occurring operations condition. Note that upstream consumers of the :class:`PTYProcess` should be careful to only use `recover=True` when they can indeed handle a disconnected/reconnected client at that point, i.e. if there are no assumptions on persistent state beyond those in control of the upstream consumers themselves. """ with self.rlock : # do we have a child which we can check? if self.child : wstat = None while True : # print('waitpid %s' % self.child) # hey, kiddo, whats up? try : wpid, wstat = os.waitpid (self.child, os.WNOHANG) # print('waitpid %s: %s - %s' % (self.child, wpid, wstat)) except OSError as e : if e.errno == errno.ECHILD : # child disappeared, go to zombie cleanup routine break raise RuntimeError("waitpid failed on wait (%s)") from e # did we get a note about child termination? if 0 == wpid : # print('waitpid %s : %s - %s -- none' \ # % (self.child, wpid, wstat)) # nope, all is well - carry on return True # Yes, we got a note. # Well, maybe the child fooled us and is just playing dead? if os.WIFSTOPPED (wstat) or \ os.WIFCONTINUED (wstat) : # print('waitpid %s : %s - %s -- stop/cont' \ # % (self.child, wpid, wstat)) # we don't care if someone stopped/resumed the child -- # that is up to higher powers. For our purposes, the # child is alive. Ha! continue break # so its dead -- make sure it stays dead, to avoid zombie # apocalypse... # print("he's dead, honeybunny, jim is dead...") self.child = None self.finalize (wstat=wstat) # check if we can attempt a post-mortem revival though if not recover : # print('not alive, not recover') # nope, we are on holy ground - revival not allowed. return False # we are allowed to revive! So can we try one more time... # pleeeease?? (for cats, allow up to 9 attempts; for Buddhists, # always allow to reincarnate, etc.) if self.recover_attempts >= self.recover_max : # nope, its gone for good - just report the sad news # print('not alive, no recover anymore') return False # MEDIIIIC!!!! self.recover_attempts += 1 self.initialize () # well, now we don't trust the child anymore, of course! So we # check again. Yes, this is recursive -- but note that # recover_attempts get incremented on every iteration, and this will # eventually lead to call termination (tm). # print('alive, or not alive? Check again!') return self.alive (recover=True) # -------------------------------------------------------------------- # def autopsy (self) : """ return diagnostics information string for dead child processes """ with self.rlock : if self.child : # Boooh! return "false alarm, process %s is alive!" % self.child # try a last read to grab whatever we can get (from cache) data = '' try : data = self.tail data += self.read (size=0, timeout=-1) except : pass ret = "" ret += " exit code : %s\n" % self.exit_code ret += " exit signal: %s\n" % self.exit_signal ret += " last output: %s\n" % data return ret # -------------------------------------------------------------------- # def read (self, size=0, timeout=0, _force=False) : """ read some data from the child. By default, the method reads whatever is available on the next read, up to _CHUNKSIZE, but other read sizes can be specified. The method will return whatever data it has at timeout:: timeout == 0 : return the content of the first successful read, with whatever data up to 'size' have been found. timeout < 0 : return after first read attempt, even if no data have been available. If no data are found, the method returns an empty string (not None). This method will not fill the cache, but will just read whatever data it needs (FIXME). Note: the returned lines do *not* get '\\\\r' stripped. """ with self.rlock : found_eof = False try: # start the timeout timer right now. Note that even if timeout # is short, and child.poll is slow, we will nevertheless attempt # at least one read... start = time.time () ret = "" # read until we have enough data, or hit timeout ceiling... while True : # first, lets see if we still have data in the cache we # can return if len (self.cache) : if not size : ret = self.cache self.cache = "" self.tail += ret self.tail = self.tail[-256:] return ret # we don't even need all of the cache elif size <= len (self.cache) : ret = self.cache[:size] self.cache = self.cache[size:] self.tail += ret self.tail = self.tail[-256:] return ret # otherwise we need to read some more data, right? idle # wait 'til the next data chunk arrives, or 'til _POLLDELAY rlist, _, _ = select.select ([self.parent_out], [], [], _POLLDELAY) # got some data? for f in rlist: # read whatever we still need readsize = _CHUNKSIZE if size: readsize = size - len(ret) buf = os.read (f, readsize) if len(buf) == 0 and sys.platform == 'darwin' : self.logger.debug ("read : MacOS EOF") self.finalize () found_eof = True raise se.NoSuccess("unexpected EOF: %s" % self.tail) tmp = buf.decode('utf-8') self.cache += tmp.replace ('\r', '') log = tmp.replace ('\r', '') log = log.replace ('\n', '\\n') # print("buf: --%s--" % buf) # print("log: --%s--" % log) if len(log) > _DEBUG_MAX : self.logger.debug("read : [%5d] [%5d] (%s ... %s)" % (f, len(log), log[:30], log[-30:])) else : self.logger.debug ("read : [%5d] [%5d] (%s)" % (f, len(log), log)) # for c in log : # print('%s' % c) # lets see if we still got any data in the cache we # can return if len (self.cache) : if not size : ret = self.cache self.cache = "" self.tail += ret self.tail = self.tail[-256:] return ret # we don't even need all of the cache elif size <= len (self.cache) : ret = self.cache[:size] self.cache = self.cache[size:] self.tail += ret self.tail = self.tail[-256:] return ret # at this point, we do not have sufficient data -- only # return on timeout if timeout == 0 : # only return if we have data if len (self.cache) : ret = self.cache self.cache = "" self.tail += ret self.tail = self.tail[-256:] return ret elif timeout < 0 : # return of we have data or not ret = self.cache self.cache = "" self.tail += ret self.tail = self.tail[-256:] return ret else : # timeout > 0 # return if timeout is reached now = time.time () if (now - start) > timeout : ret = self.cache self.cache = "" self.tail += ret self.tail = self.tail[-256:] return ret except Exception as e : if found_eof : raise e raise se.NoSuccess ("read from process failed '%s' : (%s)" % (e, self.tail)) from e # ---------------------------------------------------------------- # def find (self, patterns, timeout=0) : """ This methods reads bytes from the child process until a string matching any of the given patterns is found. If that is found, all read data are returned as a string, up to (and including) the match. Note that pattern can match an empty string, and the call then will return just that, an empty string. If all patterns end with matching a newline, this method is effectively matching lines -- but note that '$' will also match the end of the (currently available) data stream. The call actually returns a tuple, containing the index of the matching pattern, and the string up to the match as described above. If no pattern is found before timeout, the call returns (None, None). Negative timeouts will block until a match is found Note that the pattern are interpreted with the re.M (multi-line) and re.S (dot matches all) regex flags. Performance: the call is doing repeated string regex searches over whatever data it finds. On complex regexes, and large data, and small read buffers, this method can be expensive. Note: the returned data get '\\\\r' stripped. Note: ansi-escape sequences are also stripped before matching, but are kept in the returned data. """ if timeout is None: timeout = 0 timeout = int(timeout) def escape (txt) : pat = re.compile(r'\x1b[^m]*m') return pat.sub ('', txt) _debug = False with self.rlock : try : start = time.time () # startup timestamp ret = [] # array of read lines patts = [] # compiled patterns data = self.cache # initial data to check self.cache = "" if not data : # empty cache? data = self.read (timeout=_POLLDELAY) # pre-compile the given pattern, to speed up matching for pattern in patterns : patts.append(re.compile (pattern, re.MULTILINE | re.DOTALL)) # we wait forever -- there are two ways out though: data matches # a pattern, or timeout passes while True : # skip non-lines if not data : data += self.read (timeout=_POLLDELAY) if _debug : print((">>%s<<" % data)) escaped = escape (data) if _debug : print(('data ==%s==' % ascii(data))) if _debug : print(('escaped ==%s==' % ascii(escaped))) # check current data for any matching pattern for n in range (0, len(patts)) : # escaped = data # escaped = escape (data) # print('-- 1 --%s--' % data) # print('-- 2 --%s--' % escaped) match = patts[n].search (escaped) if _debug : print(("==%s==" % patterns[n])) if _debug : print(match) if match : # a pattern matched the current data: return a tuple # of pattern index and matching data. The remainder # of the data is cached. ret = escaped[0:match.end()] self.cache = escaped[match.end():] if _debug: m = match e = escaped print(("~~match!~~ %s" % e[m.start():m.end()])) print(("~~match!~~ %s" % (len(e)))) print(("~~match!~~ %s" % (str(m.span())))) print(("~~match!~~ %s" % (ret))) return (n, ret.replace('\r', '')) # if a timeout is given, and actually passed, return # a non-match and a copy of the data we looked at if timeout == 0 : return (None, str(escaped)) if timeout > 0 : now = time.time () if (now - start) > timeout : self.cache = escaped return (None, str(escaped)) # no match yet, still time -- read more data data += self.read (timeout=_POLLDELAY) except se.NoSuccess as e : raise ptye.translate_exception (e, "(%s)" % data) from e # ---------------------------------------------------------------- # def write (self, data, nolog=False) : """ This method will repeatedly attempt to push the given data into the child's stdin pipe, until it succeeds to write all data. """ with self.rlock : if not self.alive (recover=False) : raise ptye.translate_exception( se.NoSuccess("cannot write to dead process (%s) [%5d]" % (self.cache[-256:], self.parent_in))) try : log = self._hide_data (data, nolog) log = log.replace ('\n', '\\n') log = log.replace ('\r', '') if len(log) > _DEBUG_MAX : self.logger.debug ("write: [%5d] [%5d] (%s ... %s)" % (self.parent_in, len(data), log[:30], log[-30:])) else : self.logger.debug ("write: [%5d] [%5d] (%s)" % (self.parent_in, len(data), log)) # attempt to write forever -- until we succeeed while data : # check if the pty pipe is ready for data _, wlist, _ = select.select ([], [self.parent_in], [], _POLLDELAY) for f in wlist : # write will report the number of written bytes size = os.write(f, str.encode(data)) # otherwise, truncate by written data, and try again data = data[size:] if data : self.logger.info ("write: [%5d] [%5d]" % (f, size)) except Exception as e: raise ptye.translate_exception(e, "write failed (%s)" % e) \ from e
# ------------------------------------------------------------------------------