Skip to content

Instantly share code, notes, and snippets.

@okdtsk
Last active October 29, 2018 18:25
Show Gist options
  • Select an option

  • Save okdtsk/898175ccde8a54bd3b0b7092718d28bd to your computer and use it in GitHub Desktop.

Select an option

Save okdtsk/898175ccde8a54bd3b0b7092718d28bd to your computer and use it in GitHub Desktop.

Revisions

  1. okdtsk revised this gist Dec 20, 2016. 1 changed file with 174 additions and 0 deletions.
    174 changes: 174 additions & 0 deletions another_version.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,174 @@
    #!/usr/bin/env python

    import memory_profiler

    from typing import Any
    from typing import Dict
    from typing import Callable
    from typing import Iterable
    from typing import List
    from typing import Optional
    from typing import TypeVar

    import concurrent.futures
    import functools
    import logging
    import time

    logging.basicConfig(
    level=logging.INFO
    )
    _logger = logging.getLogger(__name__)

    BIG_OBJ = [1] * (10 ** 6)
    T = TypeVar('T')


    def testcase(name: str):
    def deco(func: Callable[..., Any]):
    @functools.wraps(func)
    def decorated_func(*args, **kwargs):
    start_time = time.time()
    _logger.info('Start loop of concurrent jobs w/ {}'.format(name))
    func(*args, **kwargs)
    finish_time = time.time()
    _logger.info('Finish loop of concurrent jobs w/ {} [Time: {:6.2}s]'.format(name, finish_time - start_time))
    return decorated_func
    return deco


    def split_iter(seq: Iterable[T], n: int) -> Iterable[List[T]]:
    sublist = []
    for v in seq:
    sublist.append(v)
    if len(sublist) == n:
    yield sublist
    sublist = []


    def job(n: int):
    return BIG_OBJ


    class ProcessPoolExecutorFutures(concurrent.futures.ProcessPoolExecutor):

    def __init__(self, fn: Callable, it: Iterable[T], max_workers: Optional[int]=None, max_queue: Optional[int]=None):
    super().__init__(max_workers=max_workers)
    self.__fn = fn
    self.__it = iter(it)
    self.__max_queue = max_queue
    self.__futures = dict() # type: Dict[concurrent.futures.Future, T]
    self.__fill_queue()

    def __enter__(self):
    return self

    def __exit__(self, exc_type, exc_val, exc_tb):
    del self.__futures
    super().__exit__(exc_type, exc_val, exc_tb)

    @property
    def fn(self):
    return self.__fn

    @property
    def it(self):
    return self.__it

    @property
    def max_queue(self):
    return self.__max_queue

    @property
    def futures(self) -> Dict[concurrent.futures.Future, T]:
    return self.__futures

    def as_completed(self):
    while self.futures:
    done_futures = []
    for f in self.futures:
    if not f.running():
    yield f
    done_futures.append(f)
    for f in done_futures:
    self.__set_done_future(f)

    def __set_done_future(self, f: concurrent.futures.Future):
    del self.__futures[f]
    self.__fill_queue()

    def __fill_queue(self):
    for i in self.__it:
    self.__futures[self.submit(self.fn, i)] = i
    if self.max_queue is not None and len(self.futures) == self.max_queue:
    break


    @testcase('simple concurrent.futures usage')
    @memory_profiler.profile
    def case1(parallel_num: int, loop_list: Iterable):
    len_sum = 0
    with concurrent.futures.ProcessPoolExecutor(max_workers=parallel_num) as e:
    futures = [e.submit(job, i) for i in loop_list]
    for f in concurrent.futures.as_completed(futures):
    ret_val = f.result()
    len_sum += len(ret_val)
    _logger.info(' >> {}'.format(len_sum))
    return


    @testcase('ProcessPoolExecutorWrapper')
    @memory_profiler.profile
    def case2(parallel_num: int, loop_list: Iterable):
    len_sum = 0
    with ProcessPoolExecutorFutures(job, loop_list, max_workers=parallel_num) as e:
    for f in e.as_completed():
    ret_val = f.result()
    len_sum += len(ret_val)
    _logger.info(' >> {}'.format(len_sum))


    @testcase('ProcessPoolExecutorWrapper & split list')
    @memory_profiler.profile
    def case3(parallel_num: int, loop_list: Iterable, split_n: int):
    len_sum = 0
    for sub_loop_list in split_iter(loop_list, split_n):
    with ProcessPoolExecutorFutures(job, sub_loop_list, max_workers=parallel_num) as e:
    for f in e.as_completed():
    ret_val = f.result()
    len_sum += len(ret_val)
    _logger.info(' >> {}'.format(len_sum))


    @testcase('ProcessPoolExecutorWrapper & split list inside class')
    @memory_profiler.profile
    def case4(parallel_num: int, loop_list: Iterable, split_n: int):
    len_sum = 0
    with ProcessPoolExecutorFutures(job, loop_list, max_workers=parallel_num, max_queue=split_n) as e:
    for f in e.as_completed():
    ret_val = f.result()
    len_sum += len(ret_val)
    _logger.info(' >> {}'.format(len_sum))


    @memory_profiler.profile
    def main(parallel_num: int, loop_n: int, split_n: int):

    loop_list = range(loop_n)

    case1(parallel_num, loop_list)
    case2(parallel_num, loop_list)
    case3(parallel_num, loop_list, split_n)
    case4(parallel_num, loop_list, split_n)


    if __name__ == '__main__':
    import argparse
    parser = argparse.ArgumentParser(description='Test snipet to save memory for concurrent.futures module')
    parser.add_argument('parallel_num', type=int, help='# of parallel worker process')
    parser.add_argument('loop_n', type=int, help='# of total loop')
    parser.add_argument('split_n', type=int, help='# of each concurrent jobs')
    parsed_args = parser.parse_args()
    main(parsed_args.parallel_num,
    parsed_args.loop_n,
    parsed_args.split_n)
  2. okdtsk created this gist Dec 20, 2016.
    47 changes: 47 additions & 0 deletions concurrent_save_mem.out
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,47 @@
    INFO:__main__:Start loop of concurrent jobs w/ simple concurrent.futures usage
    INFO:__main__:Finish loop of concurrent jobs w/ simple concurrent.futures usage
    Filename: test/save_mem_concurrent.py

    Line # Mem usage Increment Line Contents
    ================================================
    57 38.2 MiB 0.0 MiB @memory_profiler.profile
    58 def case1(parallel_num: int, loop_list: Iterable, split_n: int):
    59 38.2 MiB 0.0 MiB _logger.info('Start loop of concurrent jobs w/ simple concurrent.futures usage')
    60 38.3 MiB 0.1 MiB with concurrent.futures.ProcessPoolExecutor(max_workers=parallel_num) as e:
    61 38.7 MiB 0.4 MiB futures = [e.submit(job, i) for i in loop_list]
    62 724.0 MiB 685.3 MiB for f in concurrent.futures.as_completed(futures):
    63 724.0 MiB 0.0 MiB ret_val = f.result()
    64 724.0 MiB -0.0 MiB _logger.info('Finish loop of concurrent jobs w/ simple concurrent.futures usage')


    INFO:__main__:Start loop of concurrent jobs w/ ProcessPoolExecutorWrapper
    INFO:__main__:Finish loop of concurrent jobs w/ ProcessPoolExecutorWrapper
    Filename: test/save_mem_concurrent.py

    Line # Mem usage Increment Line Contents
    ================================================
    67 40.0 MiB 0.0 MiB @memory_profiler.profile
    68 def case2(parallel_num: int, loop_list: Iterable, split_n: int):
    69 40.0 MiB 0.0 MiB _logger.info('Start loop of concurrent jobs w/ ProcessPoolExecutorWrapper')
    70 40.1 MiB 0.1 MiB with ProcessPoolExecutorWrapper(job, loop_list, max_workers=parallel_num) as e:
    71 723.9 MiB 683.8 MiB for f in concurrent.futures.as_completed(e.futures):
    72 723.9 MiB 0.0 MiB ret_val = f.result()
    73 723.9 MiB 0.0 MiB e.set_done_future(f)
    74 47.1 MiB -676.8 MiB _logger.info('Finish loop of concurrent jobs w/ ProcessPoolExecutorWrapper')


    INFO:__main__:Start loop of concurrent jobs w/ dividing sublist and delete done future objs
    INFO:__main__:Finish loop of concurrent jobs w/ dividing sublist and delete done future objs
    Filename: test/save_mem_concurrent.py

    Line # Mem usage Increment Line Contents
    ================================================
    77 40.3 MiB 0.0 MiB @memory_profiler.profile
    78 def case3(parallel_num: int, loop_list: Iterable, split_n: int):
    79 40.3 MiB 0.0 MiB _logger.info('Start loop of concurrent jobs w/ dividing sublist and delete done future objs')
    80 47.2 MiB 6.9 MiB for sub_loop_list in split_iter(loop_list, split_n):
    81 47.2 MiB 0.0 MiB with ProcessPoolExecutorWrapper(job, sub_loop_list, max_workers=parallel_num) as e:
    82 109.1 MiB 61.9 MiB for f in concurrent.futures.as_completed(e.futures):
    83 109.1 MiB 0.0 MiB ret_val = f.result()
    84 109.1 MiB 0.0 MiB e.set_done_future(f)
    85 47.2 MiB -61.9 MiB _logger.info('Finish loop of concurrent jobs w/ dividing sublist and delete done future objs')
    107 changes: 107 additions & 0 deletions concurrent_save_mem.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,107 @@
    #!/usr/bin/env python

    import memory_profiler

    from typing import Callable
    from typing import Iterable
    from typing import List
    from typing import Optional
    from typing import TypeVar

    import concurrent.futures
    import logging

    logging.basicConfig(
    level=logging.INFO
    )
    _logger = logging.getLogger(__name__)

    BIG_OBJ = [1] * (10 ** 6)
    T = TypeVar('T')


    def split_iter(seq: Iterable[T], n: int) -> Iterable[List[T]]:
    sublist = []
    for v in seq:
    sublist.append(v)
    if len(sublist) == n:
    yield sublist
    sublist = []


    def split_list(seq: List[T], n: int) -> List[List[T]]:
    return [seq[i:i+n] for i in range(0, len(seq), n)]


    def job(n: int):
    return BIG_OBJ


    class ProcessPoolExecutorWrapper(concurrent.futures.ProcessPoolExecutor):

    def __init__(self, fn: Callable, it: Iterable, max_workers: Optional[int]=None):
    super().__init__(max_workers=max_workers)
    self.futures = {self.submit(fn, i): i for i in it}

    def __enter__(self):
    return self

    def __exit__(self, exc_type, exc_val, exc_tb):
    del self.futures
    super().__exit__(exc_type, exc_val, exc_tb)

    def set_done_future(self, f: concurrent.futures.Future):
    del self.futures[f]


    @memory_profiler.profile
    def case1(parallel_num: int, loop_list: Iterable, split_n: int):
    _logger.info('Start loop of concurrent jobs w/ simple concurrent.futures usage')
    with concurrent.futures.ProcessPoolExecutor(max_workers=parallel_num) as e:
    futures = [e.submit(job, i) for i in loop_list]
    for f in concurrent.futures.as_completed(futures):
    ret_val = f.result()
    _logger.info('Finish loop of concurrent jobs w/ simple concurrent.futures usage')


    @memory_profiler.profile
    def case2(parallel_num: int, loop_list: Iterable, split_n: int):
    _logger.info('Start loop of concurrent jobs w/ ProcessPoolExecutorWrapper')
    with ProcessPoolExecutorWrapper(job, loop_list, max_workers=parallel_num) as e:
    for f in concurrent.futures.as_completed(e.futures):
    ret_val = f.result()
    e.set_done_future(f)
    _logger.info('Finish loop of concurrent jobs w/ ProcessPoolExecutorWrapper')


    @memory_profiler.profile
    def case3(parallel_num: int, loop_list: Iterable, split_n: int):
    _logger.info('Start loop of concurrent jobs w/ dividing sublist and delete done future objs')
    for sub_loop_list in split_iter(loop_list, split_n):
    with ProcessPoolExecutorWrapper(job, sub_loop_list, max_workers=parallel_num) as e:
    for f in concurrent.futures.as_completed(e.futures):
    ret_val = f.result()
    e.set_done_future(f)
    _logger.info('Finish loop of concurrent jobs w/ dividing sublist and delete done future objs')



    def main(parallel_num: int, loop_n: int, split_n: int):

    loop_list = range(loop_n)

    case1(parallel_num, loop_list, split_n)
    case2(parallel_num, loop_list, split_n)
    case3(parallel_num, loop_list, split_n)


    if __name__ == '__main__':
    import argparse
    parser = argparse.ArgumentParser(description='Test snipet to save memory for concurrent.futures module')
    parser.add_argument('parallel_num', type=int, help='# of parallel worker process')
    parser.add_argument('loop_n', type=int, help='# of total loop')
    parser.add_argument('split_n', type=int, help='# of each concurrent jobs')
    parsed_args = parser.parse_args()
    main(parsed_args.parallel_num,
    parsed_args.loop_n,
    parsed_args.split_n)
    3 changes: 3 additions & 0 deletions concurrent_save_mem.sh
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,3 @@
    #!/bin/bash

    python test/save_mem_concurrent.py 4 100 10
    1 change: 1 addition & 0 deletions requirements.txt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1 @@
    memory_profiler