Last active
October 29, 2018 18:25
-
-
Save okdtsk/898175ccde8a54bd3b0b7092718d28bd to your computer and use it in GitHub Desktop.
How to save memory for concurrent.futures.ProcessPoolExecutor
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python | |
| import memory_profiler | |
| from typing import Any | |
| from typing import Dict | |
| from typing import Callable | |
| from typing import Iterable | |
| from typing import List | |
| from typing import Optional | |
| from typing import TypeVar | |
| import concurrent.futures | |
| import functools | |
| import logging | |
| import time | |
| logging.basicConfig( | |
| level=logging.INFO | |
| ) | |
| _logger = logging.getLogger(__name__) | |
| BIG_OBJ = [1] * (10 ** 6) | |
| T = TypeVar('T') | |
| def testcase(name: str): | |
| def deco(func: Callable[..., Any]): | |
| @functools.wraps(func) | |
| def decorated_func(*args, **kwargs): | |
| start_time = time.time() | |
| _logger.info('Start loop of concurrent jobs w/ {}'.format(name)) | |
| func(*args, **kwargs) | |
| finish_time = time.time() | |
| _logger.info('Finish loop of concurrent jobs w/ {} [Time: {:6.2}s]'.format(name, finish_time - start_time)) | |
| return decorated_func | |
| return deco | |
| def split_iter(seq: Iterable[T], n: int) -> Iterable[List[T]]: | |
| sublist = [] | |
| for v in seq: | |
| sublist.append(v) | |
| if len(sublist) == n: | |
| yield sublist | |
| sublist = [] | |
| def job(n: int): | |
| return BIG_OBJ | |
| class ProcessPoolExecutorFutures(concurrent.futures.ProcessPoolExecutor): | |
| def __init__(self, fn: Callable, it: Iterable[T], max_workers: Optional[int]=None, max_queue: Optional[int]=None): | |
| super().__init__(max_workers=max_workers) | |
| self.__fn = fn | |
| self.__it = iter(it) | |
| self.__max_queue = max_queue | |
| self.__futures = dict() # type: Dict[concurrent.futures.Future, T] | |
| self.__fill_queue() | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| del self.__futures | |
| super().__exit__(exc_type, exc_val, exc_tb) | |
| @property | |
| def fn(self): | |
| return self.__fn | |
| @property | |
| def it(self): | |
| return self.__it | |
| @property | |
| def max_queue(self): | |
| return self.__max_queue | |
| @property | |
| def futures(self) -> Dict[concurrent.futures.Future, T]: | |
| return self.__futures | |
| def as_completed(self): | |
| while self.futures: | |
| done_futures = [] | |
| for f in self.futures: | |
| if not f.running(): | |
| yield f | |
| done_futures.append(f) | |
| for f in done_futures: | |
| self.__set_done_future(f) | |
| def __set_done_future(self, f: concurrent.futures.Future): | |
| del self.__futures[f] | |
| self.__fill_queue() | |
| def __fill_queue(self): | |
| for i in self.__it: | |
| self.__futures[self.submit(self.fn, i)] = i | |
| if self.max_queue is not None and len(self.futures) == self.max_queue: | |
| break | |
| @testcase('simple concurrent.futures usage') | |
| @memory_profiler.profile | |
| def case1(parallel_num: int, loop_list: Iterable): | |
| len_sum = 0 | |
| with concurrent.futures.ProcessPoolExecutor(max_workers=parallel_num) as e: | |
| futures = [e.submit(job, i) for i in loop_list] | |
| for f in concurrent.futures.as_completed(futures): | |
| ret_val = f.result() | |
| len_sum += len(ret_val) | |
| _logger.info(' >> {}'.format(len_sum)) | |
| return | |
| @testcase('ProcessPoolExecutorWrapper') | |
| @memory_profiler.profile | |
| def case2(parallel_num: int, loop_list: Iterable): | |
| len_sum = 0 | |
| with ProcessPoolExecutorFutures(job, loop_list, max_workers=parallel_num) as e: | |
| for f in e.as_completed(): | |
| ret_val = f.result() | |
| len_sum += len(ret_val) | |
| _logger.info(' >> {}'.format(len_sum)) | |
| @testcase('ProcessPoolExecutorWrapper & split list') | |
| @memory_profiler.profile | |
| def case3(parallel_num: int, loop_list: Iterable, split_n: int): | |
| len_sum = 0 | |
| for sub_loop_list in split_iter(loop_list, split_n): | |
| with ProcessPoolExecutorFutures(job, sub_loop_list, max_workers=parallel_num) as e: | |
| for f in e.as_completed(): | |
| ret_val = f.result() | |
| len_sum += len(ret_val) | |
| _logger.info(' >> {}'.format(len_sum)) | |
| @testcase('ProcessPoolExecutorWrapper & split list inside class') | |
| @memory_profiler.profile | |
| def case4(parallel_num: int, loop_list: Iterable, split_n: int): | |
| len_sum = 0 | |
| with ProcessPoolExecutorFutures(job, loop_list, max_workers=parallel_num, max_queue=split_n) as e: | |
| for f in e.as_completed(): | |
| ret_val = f.result() | |
| len_sum += len(ret_val) | |
| _logger.info(' >> {}'.format(len_sum)) | |
| @memory_profiler.profile | |
| def main(parallel_num: int, loop_n: int, split_n: int): | |
| loop_list = range(loop_n) | |
| case1(parallel_num, loop_list) | |
| case2(parallel_num, loop_list) | |
| case3(parallel_num, loop_list, split_n) | |
| case4(parallel_num, loop_list, split_n) | |
| if __name__ == '__main__': | |
| import argparse | |
| parser = argparse.ArgumentParser(description='Test snipet to save memory for concurrent.futures module') | |
| parser.add_argument('parallel_num', type=int, help='# of parallel worker process') | |
| parser.add_argument('loop_n', type=int, help='# of total loop') | |
| parser.add_argument('split_n', type=int, help='# of each concurrent jobs') | |
| parsed_args = parser.parse_args() | |
| main(parsed_args.parallel_num, | |
| parsed_args.loop_n, | |
| parsed_args.split_n) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| INFO:__main__:Start loop of concurrent jobs w/ simple concurrent.futures usage | |
| INFO:__main__:Finish loop of concurrent jobs w/ simple concurrent.futures usage | |
| Filename: test/save_mem_concurrent.py | |
| Line # Mem usage Increment Line Contents | |
| ================================================ | |
| 57 38.2 MiB 0.0 MiB @memory_profiler.profile | |
| 58 def case1(parallel_num: int, loop_list: Iterable, split_n: int): | |
| 59 38.2 MiB 0.0 MiB _logger.info('Start loop of concurrent jobs w/ simple concurrent.futures usage') | |
| 60 38.3 MiB 0.1 MiB with concurrent.futures.ProcessPoolExecutor(max_workers=parallel_num) as e: | |
| 61 38.7 MiB 0.4 MiB futures = [e.submit(job, i) for i in loop_list] | |
| 62 724.0 MiB 685.3 MiB for f in concurrent.futures.as_completed(futures): | |
| 63 724.0 MiB 0.0 MiB ret_val = f.result() | |
| 64 724.0 MiB -0.0 MiB _logger.info('Finish loop of concurrent jobs w/ simple concurrent.futures usage') | |
| INFO:__main__:Start loop of concurrent jobs w/ ProcessPoolExecutorWrapper | |
| INFO:__main__:Finish loop of concurrent jobs w/ ProcessPoolExecutorWrapper | |
| Filename: test/save_mem_concurrent.py | |
| Line # Mem usage Increment Line Contents | |
| ================================================ | |
| 67 40.0 MiB 0.0 MiB @memory_profiler.profile | |
| 68 def case2(parallel_num: int, loop_list: Iterable, split_n: int): | |
| 69 40.0 MiB 0.0 MiB _logger.info('Start loop of concurrent jobs w/ ProcessPoolExecutorWrapper') | |
| 70 40.1 MiB 0.1 MiB with ProcessPoolExecutorWrapper(job, loop_list, max_workers=parallel_num) as e: | |
| 71 723.9 MiB 683.8 MiB for f in concurrent.futures.as_completed(e.futures): | |
| 72 723.9 MiB 0.0 MiB ret_val = f.result() | |
| 73 723.9 MiB 0.0 MiB e.set_done_future(f) | |
| 74 47.1 MiB -676.8 MiB _logger.info('Finish loop of concurrent jobs w/ ProcessPoolExecutorWrapper') | |
| INFO:__main__:Start loop of concurrent jobs w/ dividing sublist and delete done future objs | |
| INFO:__main__:Finish loop of concurrent jobs w/ dividing sublist and delete done future objs | |
| Filename: test/save_mem_concurrent.py | |
| Line # Mem usage Increment Line Contents | |
| ================================================ | |
| 77 40.3 MiB 0.0 MiB @memory_profiler.profile | |
| 78 def case3(parallel_num: int, loop_list: Iterable, split_n: int): | |
| 79 40.3 MiB 0.0 MiB _logger.info('Start loop of concurrent jobs w/ dividing sublist and delete done future objs') | |
| 80 47.2 MiB 6.9 MiB for sub_loop_list in split_iter(loop_list, split_n): | |
| 81 47.2 MiB 0.0 MiB with ProcessPoolExecutorWrapper(job, sub_loop_list, max_workers=parallel_num) as e: | |
| 82 109.1 MiB 61.9 MiB for f in concurrent.futures.as_completed(e.futures): | |
| 83 109.1 MiB 0.0 MiB ret_val = f.result() | |
| 84 109.1 MiB 0.0 MiB e.set_done_future(f) | |
| 85 47.2 MiB -61.9 MiB _logger.info('Finish loop of concurrent jobs w/ dividing sublist and delete done future objs') |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python | |
| import memory_profiler | |
| from typing import Callable | |
| from typing import Iterable | |
| from typing import List | |
| from typing import Optional | |
| from typing import TypeVar | |
| import concurrent.futures | |
| import logging | |
| logging.basicConfig( | |
| level=logging.INFO | |
| ) | |
| _logger = logging.getLogger(__name__) | |
| BIG_OBJ = [1] * (10 ** 6) | |
| T = TypeVar('T') | |
| def split_iter(seq: Iterable[T], n: int) -> Iterable[List[T]]: | |
| sublist = [] | |
| for v in seq: | |
| sublist.append(v) | |
| if len(sublist) == n: | |
| yield sublist | |
| sublist = [] | |
| def split_list(seq: List[T], n: int) -> List[List[T]]: | |
| return [seq[i:i+n] for i in range(0, len(seq), n)] | |
| def job(n: int): | |
| return BIG_OBJ | |
| class ProcessPoolExecutorWrapper(concurrent.futures.ProcessPoolExecutor): | |
| def __init__(self, fn: Callable, it: Iterable, max_workers: Optional[int]=None): | |
| super().__init__(max_workers=max_workers) | |
| self.futures = {self.submit(fn, i): i for i in it} | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| del self.futures | |
| super().__exit__(exc_type, exc_val, exc_tb) | |
| def set_done_future(self, f: concurrent.futures.Future): | |
| del self.futures[f] | |
| @memory_profiler.profile | |
| def case1(parallel_num: int, loop_list: Iterable, split_n: int): | |
| _logger.info('Start loop of concurrent jobs w/ simple concurrent.futures usage') | |
| with concurrent.futures.ProcessPoolExecutor(max_workers=parallel_num) as e: | |
| futures = [e.submit(job, i) for i in loop_list] | |
| for f in concurrent.futures.as_completed(futures): | |
| ret_val = f.result() | |
| _logger.info('Finish loop of concurrent jobs w/ simple concurrent.futures usage') | |
| @memory_profiler.profile | |
| def case2(parallel_num: int, loop_list: Iterable, split_n: int): | |
| _logger.info('Start loop of concurrent jobs w/ ProcessPoolExecutorWrapper') | |
| with ProcessPoolExecutorWrapper(job, loop_list, max_workers=parallel_num) as e: | |
| for f in concurrent.futures.as_completed(e.futures): | |
| ret_val = f.result() | |
| e.set_done_future(f) | |
| _logger.info('Finish loop of concurrent jobs w/ ProcessPoolExecutorWrapper') | |
| @memory_profiler.profile | |
| def case3(parallel_num: int, loop_list: Iterable, split_n: int): | |
| _logger.info('Start loop of concurrent jobs w/ dividing sublist and delete done future objs') | |
| for sub_loop_list in split_iter(loop_list, split_n): | |
| with ProcessPoolExecutorWrapper(job, sub_loop_list, max_workers=parallel_num) as e: | |
| for f in concurrent.futures.as_completed(e.futures): | |
| ret_val = f.result() | |
| e.set_done_future(f) | |
| _logger.info('Finish loop of concurrent jobs w/ dividing sublist and delete done future objs') | |
| def main(parallel_num: int, loop_n: int, split_n: int): | |
| loop_list = range(loop_n) | |
| case1(parallel_num, loop_list, split_n) | |
| case2(parallel_num, loop_list, split_n) | |
| case3(parallel_num, loop_list, split_n) | |
| if __name__ == '__main__': | |
| import argparse | |
| parser = argparse.ArgumentParser(description='Test snipet to save memory for concurrent.futures module') | |
| parser.add_argument('parallel_num', type=int, help='# of parallel worker process') | |
| parser.add_argument('loop_n', type=int, help='# of total loop') | |
| parser.add_argument('split_n', type=int, help='# of each concurrent jobs') | |
| parsed_args = parser.parse_args() | |
| main(parsed_args.parallel_num, | |
| parsed_args.loop_n, | |
| parsed_args.split_n) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/bin/bash | |
| python test/save_mem_concurrent.py 4 100 10 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| memory_profiler |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi, your code is great and well-formatted with profiles. But I still have some questions about your ProcessPoolExecutorFutures. It looks like another implementation of concurrent.futures.ProcessPoolExecutor.map. Is there any differences bewteen your excutors' submit and map among usage?