Last active
July 21, 2022 02:08
-
-
Save Nicolas-Constanty/bc288da59ea324384361b03fa0a7473e to your computer and use it in GitHub Desktop.
subprocess command streaming through pipe
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import subprocess | |
| import time | |
| import asyncio | |
| from asyncio.subprocess import PIPE, STDOUT | |
| import sys | |
| class SubprocessAPI: | |
| def __init__(self, command, auto_connect: bool = False) -> None: | |
| self.process = None | |
| self.write = None | |
| self.read = None | |
| self.subprocess_cmd = command | |
| self.connected = False | |
| if sys.platform == "win32": | |
| self.loop = asyncio.ProactorEventLoop() # For subprocess' pipes on Windows | |
| asyncio.set_event_loop(self.loop) | |
| else: | |
| self.loop = asyncio.get_event_loop() | |
| if auto_connect: | |
| self.connect() | |
| def __del__(self): | |
| if self.connected: | |
| self.disconnect() | |
| def __enter__(self): | |
| self.loop.run_until_complete(self.connect()) | |
| return self | |
| def __exit__(self, type, value, traceback): | |
| self.disconnect() | |
| def disconnect(self) -> None: | |
| if not self.connected: | |
| print("Already disconnected") | |
| return | |
| os.close(self.write) | |
| os.close(self.read) | |
| self.process.kill() | |
| self.connected = False | |
| print("Bridge disconnected") | |
| async def connect(self) -> bool: | |
| if self.connected: | |
| print("Already connected") | |
| return self.connected | |
| self.read, self.write = os.pipe() | |
| self.process = await asyncio.create_subprocess_exec(*self.subprocess_cmd, stdin=self.read, stdout=PIPE) | |
| for i in range(0, 100): | |
| line = await self.read_line() | |
| if line == 'Ready': | |
| print('Bridge connected') | |
| self.connected = True | |
| break | |
| time.sleep(0.1) | |
| return self.connected | |
| @staticmethod | |
| def __default_callback(message): | |
| print(message) | |
| def send_command(self, command: str, callback=__default_callback.__func__) -> str: | |
| if not self.connected: | |
| print("API disconnected, please use connect before.") | |
| return | |
| command = command + os.linesep | |
| os.write(self.write, command.encode()) | |
| line = self.loop.run_until_complete(self.read_line()) | |
| if callback is not None: | |
| callback(line) | |
| return line | |
| async def read_line(self, timeout=10): | |
| try: | |
| line = await asyncio.wait_for(self.process.stdout.readline(), timeout) | |
| return line.decode().strip() | |
| except asyncio.TimeoutError: | |
| print('Timeout') | |
| return None |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment