Created
November 1, 2019 11:25
-
-
Save eriseven/24bf1c61052a1b36ed68970251a60d20 to your computer and use it in GitHub Desktop.
Revisions
-
eriseven created this gist
Nov 1, 2019 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,178 @@ """Async and await example using subprocesses Note: Requires Python 3.6. """ import sys import time import platform import asyncio from pprint import pprint async def run_command(*args): """Run command in subprocess. Example from: http://asyncio.readthedocs.io/en/latest/subprocess.html """ # Create subprocess process = await asyncio.create_subprocess_exec( *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) # Status print("Started: %s, pid=%s" % (args, process.pid), flush=True) # Wait for the subprocess to finish stdout, stderr = await process.communicate() # Progress if process.returncode == 0: print( "Done: %s, pid=%s, result: %s" % (args, process.pid, stdout.decode().strip()), flush=True, ) else: print( "Failed: %s, pid=%s, result: %s" % (args, process.pid, stderr.decode().strip()), flush=True, ) # Result result = stdout.decode().strip() # Return stdout return result async def run_command_shell(command): """Run command in subprocess (shell). Note: This can be used if you wish to execute e.g. "copy" on Windows, which can only be executed in the shell. """ # Create subprocess process = await asyncio.create_subprocess_shell( command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) # Status print("Started:", command, "(pid = " + str(process.pid) + ")", flush=True) # Wait for the subprocess to finish stdout, stderr = await process.communicate() # Progress if process.returncode == 0: print("Done:", command, "(pid = " + str(process.pid) + ")", flush=True) else: print( "Failed:", command, "(pid = " + str(process.pid) + ")", flush=True ) # Result result = stdout.decode().strip() # Return stdout return result def make_chunks(l, n): """Yield successive n-sized chunks from l. Note: Taken from https://stackoverflow.com/a/312464 """ if sys.version_info.major == 2: for i in xrange(0, len(l), n): yield l[i : i + n] else: # Assume Python 3 for i in range(0, len(l), n): yield l[i : i + n] def run_asyncio_commands(tasks, max_concurrent_tasks=0): """Run tasks asynchronously using asyncio and return results. If max_concurrent_tasks are set to 0, no limit is applied. Note: By default, Windows uses SelectorEventLoop, which does not support subprocesses. Therefore ProactorEventLoop is used on Windows. https://docs.python.org/3/library/asyncio-eventloops.html#windows """ all_results = [] if max_concurrent_tasks == 0: chunks = [tasks] num_chunks = len(chunks) else: chunks = make_chunks(l=tasks, n=max_concurrent_tasks) num_chunks = len(list(make_chunks(l=tasks, n=max_concurrent_tasks))) if asyncio.get_event_loop().is_closed(): asyncio.set_event_loop(asyncio.new_event_loop()) if platform.system() == "Windows": asyncio.set_event_loop(asyncio.ProactorEventLoop()) loop = asyncio.get_event_loop() chunk = 1 for tasks_in_chunk in chunks: print( "Beginning work on chunk %s/%s" % (chunk, num_chunks), flush=True ) commands = asyncio.gather(*tasks_in_chunk) # Unpack list using * results = loop.run_until_complete(commands) all_results += results print( "Completed work on chunk %s/%s" % (chunk, num_chunks), flush=True ) chunk += 1 loop.close() return all_results def main(): """Main program.""" start = time.time() if platform.system() == "Windows": # Commands to be executed on Windows commands = [["hostname"]] else: # Commands to be executed on Unix commands = [["du", "-sh", "/var/tmp"], ["hostname"]] tasks = [] for command in commands: tasks.append(run_command(*command)) # # Shell execution example # tasks = [run_command_shell('copy c:/somefile d:/new_file')] # # List comprehension example # tasks = [ # run_command(*command, get_project_path(project)) # for project in accessible_projects(all_projects) # ] results = run_asyncio_commands( tasks, max_concurrent_tasks=20 ) # At most 20 parallel tasks print("Results:") pprint(results) end = time.time() rounded_end = "{0:.4f}".format(round(end - start, 4)) print("Script ran in about %s seconds" % (rounded_end), flush=True) if __name__ == "__main__": main()