Skip to content

Instantly share code, notes, and snippets.

@crimoniv
Last active June 1, 2023 01:53
Show Gist options
  • Select an option

  • Save crimoniv/0e3622f9d265f0e5459e747233b73b90 to your computer and use it in GitHub Desktop.

Select an option

Save crimoniv/0e3622f9d265f0e5459e747233b73b90 to your computer and use it in GitHub Desktop.
PuDB TTY PoC
#!/usr/bin/env python
from __future__ import print_function
from __future__ import unicode_literals
import argparse
import errno
import os
import select
import signal
import socket
import struct
import sys
DEFAULT_SOCKET = os.path.join(os.path.expanduser('~'), '.pudb.sock')
DESCRIPTION = """\
example:
./pudb-client
./pudb-client --socket /foo/bar.sock
"""
class ArgumentParserFormatter(
argparse.ArgumentDefaultsHelpFormatter,
argparse.RawDescriptionHelpFormatter):
pass
def parse_args():
parser = argparse.ArgumentParser(
epilog=DESCRIPTION,
formatter_class=ArgumentParserFormatter)
parser.add_argument(
'-s', '--socket', default=DEFAULT_SOCKET,
help='Unix socket path')
return parser.parse_args()
def recvall(sock, size, max_bufsize=4096):
"""Receive fixed data size from socket"""
data = b''
while len(data) < size:
buf = sock.recv(min(size - len(data), max_bufsize))
if not buf:
raise Exception('No data received')
data += buf
return data
def send_tty(sock, tty):
tty_bin = tty.encode('utf-8')
message = struct.pack('!I', len(tty_bin)) + tty_bin
sock.sendall(message)
def recv_pid(sock):
return struct.unpack('!I', recvall(sock, 4))[0]
def main():
args = parse_args()
tty = os.ttyname(sys.stdout.fileno())
print('Connecting to %s...' % (args.socket,))
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
sock.connect(args.socket)
except socket.error as e:
print('Unable to connect: %s' % (e,))
sys.exit(1)
try:
send_tty(sock, tty)
pid = recv_pid(sock)
finally:
sock.close()
print('Connected!')
while True:
try:
select.select([], [], [])
except KeyboardInterrupt:
try:
os.kill(pid, signal.SIGINT)
except OSError as e:
if e.errno == errno.ESRCH: # no such process
print('Session closed.')
break
raise
if __name__ == '__main__':
main()
#!/usr/bin/env python
from __future__ import print_function
from __future__ import unicode_literals
import argparse
import contextlib
import os
import socket
import stat
import struct
DEFAULT_SOCKET = os.path.join(os.path.expanduser('~'), '.pudb.sock')
DESCRIPTION = """\
example:
./pudb-server --socket /foo/bar.sock script.py
./pudb-server --paused script.py
./pudb-server script.py
./pudb-server -- script.py --help
PYINTERP=python ./pudb-server script.py
"""
class ArgumentParserFormatter(
argparse.ArgumentDefaultsHelpFormatter,
argparse.RawDescriptionHelpFormatter):
pass
def parse_args():
parser = argparse.ArgumentParser(
epilog=DESCRIPTION,
formatter_class=ArgumentParserFormatter)
parser.add_argument(
'-s', '--socket', default=DEFAULT_SOCKET,
help='Unix socket path')
parser.add_argument(
'-p', '--paused', action='store_true',
help='Break the debugger at the very beginning. Otherwise, run '
'until a breakpoint is hit.')
parser.add_argument('args', nargs=argparse.REMAINDER)
args = parser.parse_args()
try:
# !!! remove the first '--', if present
args.args.pop(args.args.index('--'))
except ValueError:
pass
return args
def recvall(sock, size, max_bufsize=4096):
"""Receive fixed data size from socket"""
data = b''
while len(data) < size:
buf = sock.recv(min(size - len(data), max_bufsize))
if not buf:
raise Exception('No data received')
data += buf
return data
def recv_tty(sock):
tty_len = struct.unpack('!I', recvall(sock, 4))[0]
return recvall(sock, tty_len).decode('utf-8')
def send_pid(sock, pid):
message = struct.pack('!I', pid)
sock.sendall(message)
def istty(path):
if not path or not os.path.exists(path):
return False
return stat.S_ISCHR(os.stat(path).st_mode)
@contextlib.contextmanager
def bind_socket(path, mode=0o600):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
sock.bind(path)
sock.listen(1)
os.chmod(path, 0o600)
yield sock
finally:
sock.close()
if os.path.exists(path):
os.unlink(path)
def main():
args = parse_args()
try:
os.unlink(args.socket)
except OSError:
if os.path.exists(args.socket):
raise
print('Listening on: %s' % (args.socket,))
with bind_socket(args.socket) as sock:
print('Waiting for client to connect...')
conn, addr = sock.accept()
try:
tty = recv_tty(conn)
if not istty(tty):
raise Exception('Not a TTY: %s' % (tty,))
pid = os.getpid()
send_pid(conn, pid)
finally:
conn.close()
print('Client connected! (TTY: %s)' % (tty,))
pyinterp = os.getenv('PYINTERP', 'python')
if args.paused:
args.args[0:0] = [pyinterp, '-m', 'pudb']
elif not os.access(args.args[0], os.X_OK):
args.args[0:0] = [pyinterp]
env = dict(os.environ, PUDB_TTY=tty)
os.execvpe(args.args[0], args.args, env)
if __name__ == '__main__':
main()
@crimoniv
Copy link
Copy Markdown
Author

UPDATE:

  • Start in a non-breaking way unless --update is set.
  • Check if target script is executable before prepending PYINTERP.

@axman6
Copy link
Copy Markdown

axman6 commented Jun 1, 2023

Hey, thank you for making this, it’s the closest I’be got to having pudb working in a separate terminal than the app. When I run these, I get what looks like the appropriate log messages

$ pudb-client
Connecting to /home/alex/.pudb.sock...
Connected!
$ pudb-server `which moku` gui
Listening on: /home/alex/.pudb.sock
Waiting for client to connect...
Client connected! (TTY: /dev/pts/17)

however I don’t get the pudb gui at all. When I use the —paused flag, I the errors mentioned in inducer/pudb#598. I’m guessing this isn’t your issue to fix, just a heads up that this isn’t working at the moment.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment