Skip to content

Instantly share code, notes, and snippets.

@daryl314
Created May 8, 2019 16:58
Show Gist options
  • Select an option

  • Save daryl314/414ced790bddf8f93e682cd94732398e to your computer and use it in GitHub Desktop.

Select an option

Save daryl314/414ced790bddf8f93e682cd94732398e to your computer and use it in GitHub Desktop.

Revisions

  1. daryl314 created this gist May 8, 2019.
    115 changes: 115 additions & 0 deletions BackgroundPty.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,115 @@
    import pty, sys, os, re, tty, select


    class ReadBuffer(object):
    """Buffer read operations"""

    def __init__(self, fd):
    if isinstance(fd, file):
    fd = fd.fileno()
    self.fd = fd
    self.escape = ''

    def read(self):
    c = os.read(self.fd, 1)
    if self.escape == '' and c != '\x1b':
    return c
    else:
    self.escape += c
    if self.completeEscape(self.escape):
    out = self.escape
    self.escape = ''
    return out

    @staticmethod
    def completeEscape(seq):
    """True if sequence is a complete escape sequence"""
    assert seq[0] == '\x1b'
    if len(seq) == 1:
    return False
    elif len(seq) == 2 and seq != '\x1b[':
    return True
    else:
    return re.match(r'[A-Za-z]', seq[-1])


    class Logger(object):
    """Log data flow"""

    def __init__(self, width=3, logger=sys.stderr):
    self.fmt = '\n[%0{}d %s] '.format(width)
    self.last_tag = None
    self.counter = 0
    self.logger = logger

    def log(self, tag, txt):

    # start a new line if tag changed or if text is an escape sequence
    if tag != self.last_tag or txt[0] == '\x1b':
    self.logger.write(self.fmt % (self.counter, tag))
    self.counter += 1

    # log text to current line
    self.logger.write(txt.__repr__()[1:-1])

    # start a new line if text ends with a newline; otherwise save tag to continue line
    if txt.endswith('\n') or txt.endswith('\n\r'):
    self.last_tag = None
    else:
    self.last_tag = tag


    class PtyHandler(object):
    """pseudo-terminal interactivity"""

    @classmethod
    def background(cls, callback, verbose=False):
    """Execute a callback function in a background pseudo-terminal"""
    (child_pid, pty_fd) = pty.fork()
    if child_pid == 0: # in child process with stdin/stdout/stderr mapped to pty
    tty.setraw(sys.stdin) # https://en.wikipedia.org/wiki/Terminal_mode
    callback()
    else: # in parent process
    (master, slave) = pty.openpty()
    tty.setraw(slave) # https://en.wikipedia.org/wiki/Terminal_mode
    print("Connect to pty: `screen {}`".format(os.ttyname(slave)))
    cls.intercept(
    master, # file descriptor for user-facing pseudo-terminal
    pty_fd, # file descriptor for background process
    sLF=True, # add line feed to slave newlines to make GNU screen happy
    sFilter={'\x1b[6n'}, # filter out slave "Query Cursor Position" requests
    verbose=verbose, # display debugging information?
    )

    @classmethod
    def intercept(cls, master_fd, slave_fd, verbose=False, mLF=False, sLF=False, mFilter=[], sFilter=[]):
    """Intercept traffic between two TTY's"""
    mBuf = ReadBuffer(master_fd)
    sBuf = ReadBuffer(slave_fd)
    logger = Logger() if verbose else None
    while True:
    rs, ws, es = select.select([master_fd, slave_fd], [], [])
    for r in rs:
    if r is slave_fd:
    cls.route(sBuf, master_fd, logger, 'S', addLF=sLF, filtered=sFilter)
    elif r is master_fd:
    cls.route(mBuf, slave_fd, logger, 'M', addLF=mLF, filtered=mFilter)

    @classmethod
    def route(cls, buf, out_fd, logger, tag, addLF=False, filtered=[]):
    """Read data from a ReadBuffer and route to an output file descriptor"""
    c = buf.read()
    if c is not None:
    if addLF:
    c = c.replace('\n', '\n\r')
    if logger is not None:
    logger.log(tag, c)
    if c not in filtered:
    os.write(out_fd, c)


    if __name__ == '__main__':
    def fn():
    x = 42 # put in scope of IPython
    __import__('IPython').embed()
    PtyHandler.background(fn, verbose=True)