Skip to content

Instantly share code, notes, and snippets.

@Nicolas-Constanty
Last active July 21, 2022 02:08
Show Gist options
  • Select an option

  • Save Nicolas-Constanty/bc288da59ea324384361b03fa0a7473e to your computer and use it in GitHub Desktop.

Select an option

Save Nicolas-Constanty/bc288da59ea324384361b03fa0a7473e to your computer and use it in GitHub Desktop.
subprocess command streaming through pipe
import os
import subprocess
import time
import asyncio
from asyncio.subprocess import PIPE, STDOUT
import sys
class SubprocessAPI:
def __init__(self, command, auto_connect: bool = False) -> None:
self.process = None
self.write = None
self.read = None
self.subprocess_cmd = command
self.connected = False
if sys.platform == "win32":
self.loop = asyncio.ProactorEventLoop() # For subprocess' pipes on Windows
asyncio.set_event_loop(self.loop)
else:
self.loop = asyncio.get_event_loop()
if auto_connect:
self.connect()
def __del__(self):
if self.connected:
self.disconnect()
def __enter__(self):
self.loop.run_until_complete(self.connect())
return self
def __exit__(self, type, value, traceback):
self.disconnect()
def disconnect(self) -> None:
if not self.connected:
print("Already disconnected")
return
os.close(self.write)
os.close(self.read)
self.process.kill()
self.connected = False
print("Bridge disconnected")
async def connect(self) -> bool:
if self.connected:
print("Already connected")
return self.connected
self.read, self.write = os.pipe()
self.process = await asyncio.create_subprocess_exec(*self.subprocess_cmd, stdin=self.read, stdout=PIPE)
for i in range(0, 100):
line = await self.read_line()
if line == 'Ready':
print('Bridge connected')
self.connected = True
break
time.sleep(0.1)
return self.connected
@staticmethod
def __default_callback(message):
print(message)
def send_command(self, command: str, callback=__default_callback.__func__) -> str:
if not self.connected:
print("API disconnected, please use connect before.")
return
command = command + os.linesep
os.write(self.write, command.encode())
line = self.loop.run_until_complete(self.read_line())
if callback is not None:
callback(line)
return line
async def read_line(self, timeout=10):
try:
line = await asyncio.wait_for(self.process.stdout.readline(), timeout)
return line.decode().strip()
except asyncio.TimeoutError:
print('Timeout')
return None
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment