#!/usr/bin/env python3 import collections import asyncio import sys class AsyncSubProcess: def __init__(self, program, *args, env={}, output=True, cwd=None): self.proc = None self.program = program self.args = args self.env = env self.cwd = cwd self.callback_stdout = output and (lambda x: print(x, end="")) self.callback_stderr = output and (lambda x: print(x, end="", file=sys.stderr)) self.tasks = {} async def exec(self): self.proc = await asyncio.create_subprocess_exec( self.program, *self.args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=self.env, cwd=self.cwd ) @staticmethod async def read_stream(stream, callback=None): output = collections.deque(maxlen=50000) while not stream.at_eof(): line = await stream.readline() output.append(line.decode("UTF-8")) if line and callback: callback(output[-1]) return output def create_tasks(self): self.tasks["stdout"] = asyncio.create_task( self.read_stream(self.proc.stdout, self.callback_stdout) ) self.tasks["stderr"] = asyncio.create_task( self.read_stream(self.proc.stderr, self.callback_stderr) ) async def wait(self): self.create_tasks() await asyncio.wait(self.tasks.values()) return ( self.proc.returncode, "".join(self.tasks["stdout"].result()), "".join(self.tasks["stderr"].result()), ) @classmethod async def main(self, *cmd, **kw): runner = self(*cmd, **kw) await runner.exec() return await runner.wait() def run(*cmd, **kw): return asyncio.run(AsyncSubProcess.main(*cmd, **kw))