Created
May 8, 2019 16:58
-
-
Save daryl314/414ced790bddf8f93e682cd94732398e to your computer and use it in GitHub Desktop.
Revisions
-
daryl314 created this gist
May 8, 2019 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,115 @@ import pty, sys, os, re, tty, select class ReadBuffer(object): """Buffer read operations""" def __init__(self, fd): if isinstance(fd, file): fd = fd.fileno() self.fd = fd self.escape = '' def read(self): c = os.read(self.fd, 1) if self.escape == '' and c != '\x1b': return c else: self.escape += c if self.completeEscape(self.escape): out = self.escape self.escape = '' return out @staticmethod def completeEscape(seq): """True if sequence is a complete escape sequence""" assert seq[0] == '\x1b' if len(seq) == 1: return False elif len(seq) == 2 and seq != '\x1b[': return True else: return re.match(r'[A-Za-z]', seq[-1]) class Logger(object): """Log data flow""" def __init__(self, width=3, logger=sys.stderr): self.fmt = '\n[%0{}d %s] '.format(width) self.last_tag = None self.counter = 0 self.logger = logger def log(self, tag, txt): # start a new line if tag changed or if text is an escape sequence if tag != self.last_tag or txt[0] == '\x1b': self.logger.write(self.fmt % (self.counter, tag)) self.counter += 1 # log text to current line self.logger.write(txt.__repr__()[1:-1]) # start a new line if text ends with a newline; otherwise save tag to continue line if txt.endswith('\n') or txt.endswith('\n\r'): self.last_tag = None else: self.last_tag = tag class PtyHandler(object): """pseudo-terminal interactivity""" @classmethod def background(cls, callback, verbose=False): """Execute a callback function in a background pseudo-terminal""" (child_pid, pty_fd) = pty.fork() if child_pid == 0: # in child process with stdin/stdout/stderr mapped to pty tty.setraw(sys.stdin) # https://en.wikipedia.org/wiki/Terminal_mode callback() else: # in parent process (master, slave) = pty.openpty() tty.setraw(slave) # https://en.wikipedia.org/wiki/Terminal_mode print("Connect to pty: `screen {}`".format(os.ttyname(slave))) cls.intercept( master, # file descriptor for user-facing pseudo-terminal pty_fd, # file descriptor for background process sLF=True, # add line feed to slave newlines to make GNU screen happy sFilter={'\x1b[6n'}, # filter out slave "Query Cursor Position" requests verbose=verbose, # display debugging information? ) @classmethod def intercept(cls, master_fd, slave_fd, verbose=False, mLF=False, sLF=False, mFilter=[], sFilter=[]): """Intercept traffic between two TTY's""" mBuf = ReadBuffer(master_fd) sBuf = ReadBuffer(slave_fd) logger = Logger() if verbose else None while True: rs, ws, es = select.select([master_fd, slave_fd], [], []) for r in rs: if r is slave_fd: cls.route(sBuf, master_fd, logger, 'S', addLF=sLF, filtered=sFilter) elif r is master_fd: cls.route(mBuf, slave_fd, logger, 'M', addLF=mLF, filtered=mFilter) @classmethod def route(cls, buf, out_fd, logger, tag, addLF=False, filtered=[]): """Read data from a ReadBuffer and route to an output file descriptor""" c = buf.read() if c is not None: if addLF: c = c.replace('\n', '\n\r') if logger is not None: logger.log(tag, c) if c not in filtered: os.write(out_fd, c) if __name__ == '__main__': def fn(): x = 42 # put in scope of IPython __import__('IPython').embed() PtyHandler.background(fn, verbose=True)