Skip to content

Instantly share code, notes, and snippets.

@eriseven
Created November 1, 2019 11:25
Show Gist options
  • Select an option

  • Save eriseven/24bf1c61052a1b36ed68970251a60d20 to your computer and use it in GitHub Desktop.

Select an option

Save eriseven/24bf1c61052a1b36ed68970251a60d20 to your computer and use it in GitHub Desktop.

Revisions

  1. eriseven created this gist Nov 1, 2019.
    178 changes: 178 additions & 0 deletions async_await_with_subprocesses.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,178 @@
    """Async and await example using subprocesses
    Note:
    Requires Python 3.6.
    """

    import sys
    import time
    import platform
    import asyncio
    from pprint import pprint


    async def run_command(*args):
    """Run command in subprocess.
    Example from:
    http://asyncio.readthedocs.io/en/latest/subprocess.html
    """
    # Create subprocess
    process = await asyncio.create_subprocess_exec(
    *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
    )

    # Status
    print("Started: %s, pid=%s" % (args, process.pid), flush=True)

    # Wait for the subprocess to finish
    stdout, stderr = await process.communicate()

    # Progress
    if process.returncode == 0:
    print(
    "Done: %s, pid=%s, result: %s"
    % (args, process.pid, stdout.decode().strip()),
    flush=True,
    )
    else:
    print(
    "Failed: %s, pid=%s, result: %s"
    % (args, process.pid, stderr.decode().strip()),
    flush=True,
    )

    # Result
    result = stdout.decode().strip()

    # Return stdout
    return result


    async def run_command_shell(command):
    """Run command in subprocess (shell).
    Note:
    This can be used if you wish to execute e.g. "copy"
    on Windows, which can only be executed in the shell.
    """
    # Create subprocess
    process = await asyncio.create_subprocess_shell(
    command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
    )

    # Status
    print("Started:", command, "(pid = " + str(process.pid) + ")", flush=True)

    # Wait for the subprocess to finish
    stdout, stderr = await process.communicate()

    # Progress
    if process.returncode == 0:
    print("Done:", command, "(pid = " + str(process.pid) + ")", flush=True)
    else:
    print(
    "Failed:", command, "(pid = " + str(process.pid) + ")", flush=True
    )

    # Result
    result = stdout.decode().strip()

    # Return stdout
    return result


    def make_chunks(l, n):
    """Yield successive n-sized chunks from l.
    Note:
    Taken from https://stackoverflow.com/a/312464
    """
    if sys.version_info.major == 2:
    for i in xrange(0, len(l), n):
    yield l[i : i + n]
    else:
    # Assume Python 3
    for i in range(0, len(l), n):
    yield l[i : i + n]


    def run_asyncio_commands(tasks, max_concurrent_tasks=0):
    """Run tasks asynchronously using asyncio and return results.
    If max_concurrent_tasks are set to 0, no limit is applied.
    Note:
    By default, Windows uses SelectorEventLoop, which does not support
    subprocesses. Therefore ProactorEventLoop is used on Windows.
    https://docs.python.org/3/library/asyncio-eventloops.html#windows
    """
    all_results = []

    if max_concurrent_tasks == 0:
    chunks = [tasks]
    num_chunks = len(chunks)
    else:
    chunks = make_chunks(l=tasks, n=max_concurrent_tasks)
    num_chunks = len(list(make_chunks(l=tasks, n=max_concurrent_tasks)))

    if asyncio.get_event_loop().is_closed():
    asyncio.set_event_loop(asyncio.new_event_loop())
    if platform.system() == "Windows":
    asyncio.set_event_loop(asyncio.ProactorEventLoop())
    loop = asyncio.get_event_loop()

    chunk = 1
    for tasks_in_chunk in chunks:
    print(
    "Beginning work on chunk %s/%s" % (chunk, num_chunks), flush=True
    )
    commands = asyncio.gather(*tasks_in_chunk) # Unpack list using *
    results = loop.run_until_complete(commands)
    all_results += results
    print(
    "Completed work on chunk %s/%s" % (chunk, num_chunks), flush=True
    )
    chunk += 1

    loop.close()
    return all_results


    def main():
    """Main program."""
    start = time.time()

    if platform.system() == "Windows":
    # Commands to be executed on Windows
    commands = [["hostname"]]
    else:
    # Commands to be executed on Unix
    commands = [["du", "-sh", "/var/tmp"], ["hostname"]]

    tasks = []
    for command in commands:
    tasks.append(run_command(*command))

    # # Shell execution example
    # tasks = [run_command_shell('copy c:/somefile d:/new_file')]

    # # List comprehension example
    # tasks = [
    # run_command(*command, get_project_path(project))
    # for project in accessible_projects(all_projects)
    # ]

    results = run_asyncio_commands(
    tasks, max_concurrent_tasks=20
    ) # At most 20 parallel tasks
    print("Results:")
    pprint(results)

    end = time.time()
    rounded_end = "{0:.4f}".format(round(end - start, 4))
    print("Script ran in about %s seconds" % (rounded_end), flush=True)


    if __name__ == "__main__":
    main()