Skip to content

Instantly share code, notes, and snippets.

@okdtsk
Last active October 29, 2018 18:25
Show Gist options
  • Select an option

  • Save okdtsk/898175ccde8a54bd3b0b7092718d28bd to your computer and use it in GitHub Desktop.

Select an option

Save okdtsk/898175ccde8a54bd3b0b7092718d28bd to your computer and use it in GitHub Desktop.
How to save memory for concurrent.futures.ProcessPoolExecutor
#!/usr/bin/env python
import memory_profiler
from typing import Any
from typing import Dict
from typing import Callable
from typing import Iterable
from typing import List
from typing import Optional
from typing import TypeVar
import concurrent.futures
import functools
import logging
import time
logging.basicConfig(
level=logging.INFO
)
_logger = logging.getLogger(__name__)
BIG_OBJ = [1] * (10 ** 6)
T = TypeVar('T')
def testcase(name: str):
def deco(func: Callable[..., Any]):
@functools.wraps(func)
def decorated_func(*args, **kwargs):
start_time = time.time()
_logger.info('Start loop of concurrent jobs w/ {}'.format(name))
func(*args, **kwargs)
finish_time = time.time()
_logger.info('Finish loop of concurrent jobs w/ {} [Time: {:6.2}s]'.format(name, finish_time - start_time))
return decorated_func
return deco
def split_iter(seq: Iterable[T], n: int) -> Iterable[List[T]]:
sublist = []
for v in seq:
sublist.append(v)
if len(sublist) == n:
yield sublist
sublist = []
def job(n: int):
return BIG_OBJ
class ProcessPoolExecutorFutures(concurrent.futures.ProcessPoolExecutor):
def __init__(self, fn: Callable, it: Iterable[T], max_workers: Optional[int]=None, max_queue: Optional[int]=None):
super().__init__(max_workers=max_workers)
self.__fn = fn
self.__it = iter(it)
self.__max_queue = max_queue
self.__futures = dict() # type: Dict[concurrent.futures.Future, T]
self.__fill_queue()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
del self.__futures
super().__exit__(exc_type, exc_val, exc_tb)
@property
def fn(self):
return self.__fn
@property
def it(self):
return self.__it
@property
def max_queue(self):
return self.__max_queue
@property
def futures(self) -> Dict[concurrent.futures.Future, T]:
return self.__futures
def as_completed(self):
while self.futures:
done_futures = []
for f in self.futures:
if not f.running():
yield f
done_futures.append(f)
for f in done_futures:
self.__set_done_future(f)
def __set_done_future(self, f: concurrent.futures.Future):
del self.__futures[f]
self.__fill_queue()
def __fill_queue(self):
for i in self.__it:
self.__futures[self.submit(self.fn, i)] = i
if self.max_queue is not None and len(self.futures) == self.max_queue:
break
@testcase('simple concurrent.futures usage')
@memory_profiler.profile
def case1(parallel_num: int, loop_list: Iterable):
len_sum = 0
with concurrent.futures.ProcessPoolExecutor(max_workers=parallel_num) as e:
futures = [e.submit(job, i) for i in loop_list]
for f in concurrent.futures.as_completed(futures):
ret_val = f.result()
len_sum += len(ret_val)
_logger.info(' >> {}'.format(len_sum))
return
@testcase('ProcessPoolExecutorWrapper')
@memory_profiler.profile
def case2(parallel_num: int, loop_list: Iterable):
len_sum = 0
with ProcessPoolExecutorFutures(job, loop_list, max_workers=parallel_num) as e:
for f in e.as_completed():
ret_val = f.result()
len_sum += len(ret_val)
_logger.info(' >> {}'.format(len_sum))
@testcase('ProcessPoolExecutorWrapper & split list')
@memory_profiler.profile
def case3(parallel_num: int, loop_list: Iterable, split_n: int):
len_sum = 0
for sub_loop_list in split_iter(loop_list, split_n):
with ProcessPoolExecutorFutures(job, sub_loop_list, max_workers=parallel_num) as e:
for f in e.as_completed():
ret_val = f.result()
len_sum += len(ret_val)
_logger.info(' >> {}'.format(len_sum))
@testcase('ProcessPoolExecutorWrapper & split list inside class')
@memory_profiler.profile
def case4(parallel_num: int, loop_list: Iterable, split_n: int):
len_sum = 0
with ProcessPoolExecutorFutures(job, loop_list, max_workers=parallel_num, max_queue=split_n) as e:
for f in e.as_completed():
ret_val = f.result()
len_sum += len(ret_val)
_logger.info(' >> {}'.format(len_sum))
@memory_profiler.profile
def main(parallel_num: int, loop_n: int, split_n: int):
loop_list = range(loop_n)
case1(parallel_num, loop_list)
case2(parallel_num, loop_list)
case3(parallel_num, loop_list, split_n)
case4(parallel_num, loop_list, split_n)
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='Test snipet to save memory for concurrent.futures module')
parser.add_argument('parallel_num', type=int, help='# of parallel worker process')
parser.add_argument('loop_n', type=int, help='# of total loop')
parser.add_argument('split_n', type=int, help='# of each concurrent jobs')
parsed_args = parser.parse_args()
main(parsed_args.parallel_num,
parsed_args.loop_n,
parsed_args.split_n)
INFO:__main__:Start loop of concurrent jobs w/ simple concurrent.futures usage
INFO:__main__:Finish loop of concurrent jobs w/ simple concurrent.futures usage
Filename: test/save_mem_concurrent.py
Line # Mem usage Increment Line Contents
================================================
57 38.2 MiB 0.0 MiB @memory_profiler.profile
58 def case1(parallel_num: int, loop_list: Iterable, split_n: int):
59 38.2 MiB 0.0 MiB _logger.info('Start loop of concurrent jobs w/ simple concurrent.futures usage')
60 38.3 MiB 0.1 MiB with concurrent.futures.ProcessPoolExecutor(max_workers=parallel_num) as e:
61 38.7 MiB 0.4 MiB futures = [e.submit(job, i) for i in loop_list]
62 724.0 MiB 685.3 MiB for f in concurrent.futures.as_completed(futures):
63 724.0 MiB 0.0 MiB ret_val = f.result()
64 724.0 MiB -0.0 MiB _logger.info('Finish loop of concurrent jobs w/ simple concurrent.futures usage')
INFO:__main__:Start loop of concurrent jobs w/ ProcessPoolExecutorWrapper
INFO:__main__:Finish loop of concurrent jobs w/ ProcessPoolExecutorWrapper
Filename: test/save_mem_concurrent.py
Line # Mem usage Increment Line Contents
================================================
67 40.0 MiB 0.0 MiB @memory_profiler.profile
68 def case2(parallel_num: int, loop_list: Iterable, split_n: int):
69 40.0 MiB 0.0 MiB _logger.info('Start loop of concurrent jobs w/ ProcessPoolExecutorWrapper')
70 40.1 MiB 0.1 MiB with ProcessPoolExecutorWrapper(job, loop_list, max_workers=parallel_num) as e:
71 723.9 MiB 683.8 MiB for f in concurrent.futures.as_completed(e.futures):
72 723.9 MiB 0.0 MiB ret_val = f.result()
73 723.9 MiB 0.0 MiB e.set_done_future(f)
74 47.1 MiB -676.8 MiB _logger.info('Finish loop of concurrent jobs w/ ProcessPoolExecutorWrapper')
INFO:__main__:Start loop of concurrent jobs w/ dividing sublist and delete done future objs
INFO:__main__:Finish loop of concurrent jobs w/ dividing sublist and delete done future objs
Filename: test/save_mem_concurrent.py
Line # Mem usage Increment Line Contents
================================================
77 40.3 MiB 0.0 MiB @memory_profiler.profile
78 def case3(parallel_num: int, loop_list: Iterable, split_n: int):
79 40.3 MiB 0.0 MiB _logger.info('Start loop of concurrent jobs w/ dividing sublist and delete done future objs')
80 47.2 MiB 6.9 MiB for sub_loop_list in split_iter(loop_list, split_n):
81 47.2 MiB 0.0 MiB with ProcessPoolExecutorWrapper(job, sub_loop_list, max_workers=parallel_num) as e:
82 109.1 MiB 61.9 MiB for f in concurrent.futures.as_completed(e.futures):
83 109.1 MiB 0.0 MiB ret_val = f.result()
84 109.1 MiB 0.0 MiB e.set_done_future(f)
85 47.2 MiB -61.9 MiB _logger.info('Finish loop of concurrent jobs w/ dividing sublist and delete done future objs')
#!/usr/bin/env python
import memory_profiler
from typing import Callable
from typing import Iterable
from typing import List
from typing import Optional
from typing import TypeVar
import concurrent.futures
import logging
logging.basicConfig(
level=logging.INFO
)
_logger = logging.getLogger(__name__)
BIG_OBJ = [1] * (10 ** 6)
T = TypeVar('T')
def split_iter(seq: Iterable[T], n: int) -> Iterable[List[T]]:
sublist = []
for v in seq:
sublist.append(v)
if len(sublist) == n:
yield sublist
sublist = []
def split_list(seq: List[T], n: int) -> List[List[T]]:
return [seq[i:i+n] for i in range(0, len(seq), n)]
def job(n: int):
return BIG_OBJ
class ProcessPoolExecutorWrapper(concurrent.futures.ProcessPoolExecutor):
def __init__(self, fn: Callable, it: Iterable, max_workers: Optional[int]=None):
super().__init__(max_workers=max_workers)
self.futures = {self.submit(fn, i): i for i in it}
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
del self.futures
super().__exit__(exc_type, exc_val, exc_tb)
def set_done_future(self, f: concurrent.futures.Future):
del self.futures[f]
@memory_profiler.profile
def case1(parallel_num: int, loop_list: Iterable, split_n: int):
_logger.info('Start loop of concurrent jobs w/ simple concurrent.futures usage')
with concurrent.futures.ProcessPoolExecutor(max_workers=parallel_num) as e:
futures = [e.submit(job, i) for i in loop_list]
for f in concurrent.futures.as_completed(futures):
ret_val = f.result()
_logger.info('Finish loop of concurrent jobs w/ simple concurrent.futures usage')
@memory_profiler.profile
def case2(parallel_num: int, loop_list: Iterable, split_n: int):
_logger.info('Start loop of concurrent jobs w/ ProcessPoolExecutorWrapper')
with ProcessPoolExecutorWrapper(job, loop_list, max_workers=parallel_num) as e:
for f in concurrent.futures.as_completed(e.futures):
ret_val = f.result()
e.set_done_future(f)
_logger.info('Finish loop of concurrent jobs w/ ProcessPoolExecutorWrapper')
@memory_profiler.profile
def case3(parallel_num: int, loop_list: Iterable, split_n: int):
_logger.info('Start loop of concurrent jobs w/ dividing sublist and delete done future objs')
for sub_loop_list in split_iter(loop_list, split_n):
with ProcessPoolExecutorWrapper(job, sub_loop_list, max_workers=parallel_num) as e:
for f in concurrent.futures.as_completed(e.futures):
ret_val = f.result()
e.set_done_future(f)
_logger.info('Finish loop of concurrent jobs w/ dividing sublist and delete done future objs')
def main(parallel_num: int, loop_n: int, split_n: int):
loop_list = range(loop_n)
case1(parallel_num, loop_list, split_n)
case2(parallel_num, loop_list, split_n)
case3(parallel_num, loop_list, split_n)
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='Test snipet to save memory for concurrent.futures module')
parser.add_argument('parallel_num', type=int, help='# of parallel worker process')
parser.add_argument('loop_n', type=int, help='# of total loop')
parser.add_argument('split_n', type=int, help='# of each concurrent jobs')
parsed_args = parser.parse_args()
main(parsed_args.parallel_num,
parsed_args.loop_n,
parsed_args.split_n)
#!/bin/bash
python test/save_mem_concurrent.py 4 100 10
@Trueyellow
Copy link

Trueyellow commented Oct 1, 2018

Hi, your code is great and well-formatted with profiles. But I still have some questions about your ProcessPoolExecutorFutures. It looks like another implementation of concurrent.futures.ProcessPoolExecutor.map. Is there any differences bewteen your excutors' submit and map among usage?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment