Last active
October 29, 2018 18:25
-
-
Save okdtsk/898175ccde8a54bd3b0b7092718d28bd to your computer and use it in GitHub Desktop.
Revisions
-
okdtsk revised this gist
Dec 20, 2016 . 1 changed file with 174 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,174 @@ #!/usr/bin/env python import memory_profiler from typing import Any from typing import Dict from typing import Callable from typing import Iterable from typing import List from typing import Optional from typing import TypeVar import concurrent.futures import functools import logging import time logging.basicConfig( level=logging.INFO ) _logger = logging.getLogger(__name__) BIG_OBJ = [1] * (10 ** 6) T = TypeVar('T') def testcase(name: str): def deco(func: Callable[..., Any]): @functools.wraps(func) def decorated_func(*args, **kwargs): start_time = time.time() _logger.info('Start loop of concurrent jobs w/ {}'.format(name)) func(*args, **kwargs) finish_time = time.time() _logger.info('Finish loop of concurrent jobs w/ {} [Time: {:6.2}s]'.format(name, finish_time - start_time)) return decorated_func return deco def split_iter(seq: Iterable[T], n: int) -> Iterable[List[T]]: sublist = [] for v in seq: sublist.append(v) if len(sublist) == n: yield sublist sublist = [] def job(n: int): return BIG_OBJ class ProcessPoolExecutorFutures(concurrent.futures.ProcessPoolExecutor): def __init__(self, fn: Callable, it: Iterable[T], max_workers: Optional[int]=None, max_queue: Optional[int]=None): super().__init__(max_workers=max_workers) self.__fn = fn self.__it = iter(it) self.__max_queue = max_queue self.__futures = dict() # type: Dict[concurrent.futures.Future, T] self.__fill_queue() def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): del self.__futures super().__exit__(exc_type, exc_val, exc_tb) @property def fn(self): return self.__fn @property def it(self): return self.__it @property def max_queue(self): return self.__max_queue @property def futures(self) -> Dict[concurrent.futures.Future, T]: return self.__futures def as_completed(self): while self.futures: done_futures = [] for f in self.futures: if not f.running(): yield f done_futures.append(f) for f in done_futures: self.__set_done_future(f) def __set_done_future(self, f: concurrent.futures.Future): del self.__futures[f] self.__fill_queue() def __fill_queue(self): for i in self.__it: self.__futures[self.submit(self.fn, i)] = i if self.max_queue is not None and len(self.futures) == self.max_queue: break @testcase('simple concurrent.futures usage') @memory_profiler.profile def case1(parallel_num: int, loop_list: Iterable): len_sum = 0 with concurrent.futures.ProcessPoolExecutor(max_workers=parallel_num) as e: futures = [e.submit(job, i) for i in loop_list] for f in concurrent.futures.as_completed(futures): ret_val = f.result() len_sum += len(ret_val) _logger.info(' >> {}'.format(len_sum)) return @testcase('ProcessPoolExecutorWrapper') @memory_profiler.profile def case2(parallel_num: int, loop_list: Iterable): len_sum = 0 with ProcessPoolExecutorFutures(job, loop_list, max_workers=parallel_num) as e: for f in e.as_completed(): ret_val = f.result() len_sum += len(ret_val) _logger.info(' >> {}'.format(len_sum)) @testcase('ProcessPoolExecutorWrapper & split list') @memory_profiler.profile def case3(parallel_num: int, loop_list: Iterable, split_n: int): len_sum = 0 for sub_loop_list in split_iter(loop_list, split_n): with ProcessPoolExecutorFutures(job, sub_loop_list, max_workers=parallel_num) as e: for f in e.as_completed(): ret_val = f.result() len_sum += len(ret_val) _logger.info(' >> {}'.format(len_sum)) @testcase('ProcessPoolExecutorWrapper & split list inside class') @memory_profiler.profile def case4(parallel_num: int, loop_list: Iterable, split_n: int): len_sum = 0 with ProcessPoolExecutorFutures(job, loop_list, max_workers=parallel_num, max_queue=split_n) as e: for f in e.as_completed(): ret_val = f.result() len_sum += len(ret_val) _logger.info(' >> {}'.format(len_sum)) @memory_profiler.profile def main(parallel_num: int, loop_n: int, split_n: int): loop_list = range(loop_n) case1(parallel_num, loop_list) case2(parallel_num, loop_list) case3(parallel_num, loop_list, split_n) case4(parallel_num, loop_list, split_n) if __name__ == '__main__': import argparse parser = argparse.ArgumentParser(description='Test snipet to save memory for concurrent.futures module') parser.add_argument('parallel_num', type=int, help='# of parallel worker process') parser.add_argument('loop_n', type=int, help='# of total loop') parser.add_argument('split_n', type=int, help='# of each concurrent jobs') parsed_args = parser.parse_args() main(parsed_args.parallel_num, parsed_args.loop_n, parsed_args.split_n) -
okdtsk created this gist
Dec 20, 2016 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,47 @@ INFO:__main__:Start loop of concurrent jobs w/ simple concurrent.futures usage INFO:__main__:Finish loop of concurrent jobs w/ simple concurrent.futures usage Filename: test/save_mem_concurrent.py Line # Mem usage Increment Line Contents ================================================ 57 38.2 MiB 0.0 MiB @memory_profiler.profile 58 def case1(parallel_num: int, loop_list: Iterable, split_n: int): 59 38.2 MiB 0.0 MiB _logger.info('Start loop of concurrent jobs w/ simple concurrent.futures usage') 60 38.3 MiB 0.1 MiB with concurrent.futures.ProcessPoolExecutor(max_workers=parallel_num) as e: 61 38.7 MiB 0.4 MiB futures = [e.submit(job, i) for i in loop_list] 62 724.0 MiB 685.3 MiB for f in concurrent.futures.as_completed(futures): 63 724.0 MiB 0.0 MiB ret_val = f.result() 64 724.0 MiB -0.0 MiB _logger.info('Finish loop of concurrent jobs w/ simple concurrent.futures usage') INFO:__main__:Start loop of concurrent jobs w/ ProcessPoolExecutorWrapper INFO:__main__:Finish loop of concurrent jobs w/ ProcessPoolExecutorWrapper Filename: test/save_mem_concurrent.py Line # Mem usage Increment Line Contents ================================================ 67 40.0 MiB 0.0 MiB @memory_profiler.profile 68 def case2(parallel_num: int, loop_list: Iterable, split_n: int): 69 40.0 MiB 0.0 MiB _logger.info('Start loop of concurrent jobs w/ ProcessPoolExecutorWrapper') 70 40.1 MiB 0.1 MiB with ProcessPoolExecutorWrapper(job, loop_list, max_workers=parallel_num) as e: 71 723.9 MiB 683.8 MiB for f in concurrent.futures.as_completed(e.futures): 72 723.9 MiB 0.0 MiB ret_val = f.result() 73 723.9 MiB 0.0 MiB e.set_done_future(f) 74 47.1 MiB -676.8 MiB _logger.info('Finish loop of concurrent jobs w/ ProcessPoolExecutorWrapper') INFO:__main__:Start loop of concurrent jobs w/ dividing sublist and delete done future objs INFO:__main__:Finish loop of concurrent jobs w/ dividing sublist and delete done future objs Filename: test/save_mem_concurrent.py Line # Mem usage Increment Line Contents ================================================ 77 40.3 MiB 0.0 MiB @memory_profiler.profile 78 def case3(parallel_num: int, loop_list: Iterable, split_n: int): 79 40.3 MiB 0.0 MiB _logger.info('Start loop of concurrent jobs w/ dividing sublist and delete done future objs') 80 47.2 MiB 6.9 MiB for sub_loop_list in split_iter(loop_list, split_n): 81 47.2 MiB 0.0 MiB with ProcessPoolExecutorWrapper(job, sub_loop_list, max_workers=parallel_num) as e: 82 109.1 MiB 61.9 MiB for f in concurrent.futures.as_completed(e.futures): 83 109.1 MiB 0.0 MiB ret_val = f.result() 84 109.1 MiB 0.0 MiB e.set_done_future(f) 85 47.2 MiB -61.9 MiB _logger.info('Finish loop of concurrent jobs w/ dividing sublist and delete done future objs') This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,107 @@ #!/usr/bin/env python import memory_profiler from typing import Callable from typing import Iterable from typing import List from typing import Optional from typing import TypeVar import concurrent.futures import logging logging.basicConfig( level=logging.INFO ) _logger = logging.getLogger(__name__) BIG_OBJ = [1] * (10 ** 6) T = TypeVar('T') def split_iter(seq: Iterable[T], n: int) -> Iterable[List[T]]: sublist = [] for v in seq: sublist.append(v) if len(sublist) == n: yield sublist sublist = [] def split_list(seq: List[T], n: int) -> List[List[T]]: return [seq[i:i+n] for i in range(0, len(seq), n)] def job(n: int): return BIG_OBJ class ProcessPoolExecutorWrapper(concurrent.futures.ProcessPoolExecutor): def __init__(self, fn: Callable, it: Iterable, max_workers: Optional[int]=None): super().__init__(max_workers=max_workers) self.futures = {self.submit(fn, i): i for i in it} def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): del self.futures super().__exit__(exc_type, exc_val, exc_tb) def set_done_future(self, f: concurrent.futures.Future): del self.futures[f] @memory_profiler.profile def case1(parallel_num: int, loop_list: Iterable, split_n: int): _logger.info('Start loop of concurrent jobs w/ simple concurrent.futures usage') with concurrent.futures.ProcessPoolExecutor(max_workers=parallel_num) as e: futures = [e.submit(job, i) for i in loop_list] for f in concurrent.futures.as_completed(futures): ret_val = f.result() _logger.info('Finish loop of concurrent jobs w/ simple concurrent.futures usage') @memory_profiler.profile def case2(parallel_num: int, loop_list: Iterable, split_n: int): _logger.info('Start loop of concurrent jobs w/ ProcessPoolExecutorWrapper') with ProcessPoolExecutorWrapper(job, loop_list, max_workers=parallel_num) as e: for f in concurrent.futures.as_completed(e.futures): ret_val = f.result() e.set_done_future(f) _logger.info('Finish loop of concurrent jobs w/ ProcessPoolExecutorWrapper') @memory_profiler.profile def case3(parallel_num: int, loop_list: Iterable, split_n: int): _logger.info('Start loop of concurrent jobs w/ dividing sublist and delete done future objs') for sub_loop_list in split_iter(loop_list, split_n): with ProcessPoolExecutorWrapper(job, sub_loop_list, max_workers=parallel_num) as e: for f in concurrent.futures.as_completed(e.futures): ret_val = f.result() e.set_done_future(f) _logger.info('Finish loop of concurrent jobs w/ dividing sublist and delete done future objs') def main(parallel_num: int, loop_n: int, split_n: int): loop_list = range(loop_n) case1(parallel_num, loop_list, split_n) case2(parallel_num, loop_list, split_n) case3(parallel_num, loop_list, split_n) if __name__ == '__main__': import argparse parser = argparse.ArgumentParser(description='Test snipet to save memory for concurrent.futures module') parser.add_argument('parallel_num', type=int, help='# of parallel worker process') parser.add_argument('loop_n', type=int, help='# of total loop') parser.add_argument('split_n', type=int, help='# of each concurrent jobs') parsed_args = parser.parse_args() main(parsed_args.parallel_num, parsed_args.loop_n, parsed_args.split_n) This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,3 @@ #!/bin/bash python test/save_mem_concurrent.py 4 100 10 This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1 @@ memory_profiler