Using Concurrent Futures Without Running Out Of Ram
Solution 1:
I'll take a shot (Might be a wrong guess...)
You might need to submit your work bit by bit since on each submit you're making a copy of parser_variables which may end up chewing your RAM.
Here is working code with "<----" on the interesting parts
with futures.ProcessPoolExecutor(max_workers=6) as executor:
# A dictionary which will contain a list the future info in the key, and the filename in the valuejobs = {}
# Loop through the files, and run the parse function for each file, sending the file-name to it.# The results of can come back in any order.
files_left = len(files_list) #<----
files_iter = iter(files_list) #<------while files_left:
for this_file in files_iter:
job = executor.submit(parse_function, this_file, **parser_variables)
jobs[job] = this_file
if len(jobs) > MAX_JOBS_IN_QUEUE:
break#limit the job submission for now job# Get the completed jobs whenever they are donefor job in futures.as_completed(jobs):
files_left -= 1 #one down - many to go... <---# Send the result of the file the job is based on (jobs[job]) and the job (job.result)
results_list = job.result()
this_file = jobs[job]
# delete the result from the dict as we don't need to store it.
del jobs[job]
# post-processing (putting the results into a database)
post_process(this_file, results_list)
break; #give a chance to add more jobs <-----
Solution 2:
Try adding del
to your code like this:
for job in futures.as_completed(jobs):
del jobs[job] # or `val = jobs.pop(job)`# del job # or `job._result = None`
Solution 3:
Same problem for me.
In my case I need to start millions of threads. For python2, I would write a thread pool myself using a dict. But in python3 I encounted the following error when I del finished threads dynamically:
RuntimeError: dictionary changed size during iteration
So I have to use concurrent.futures, at first I coded like this:
from concurrent.futuresimportThreadPoolExecutor
......
if __name__ == '__main__':
all_resouces = get_all_resouces()
withThreadPoolExecutor(max_workers=50) aspool:
for r inall_resouces:
pool.submit(handle_resource, *args)
But soon memory exhausted, because memory will be released only after all threads finished. I need to del finished threads before to many thread started. So I read the docs here: https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures
Find that Executor.shutdown(wait=True) might be what I need. And this is my final solution:
from concurrent.futures import ThreadPoolExecutor
......
if __name__ == '__main__':
all_resouces = get_all_resouces()
i = 0while i < len(all_resouces):
with ThreadPoolExecutor(max_workers=50) as pool:
for r in all_resouces[i:i+1000]:
pool.submit(handle_resource, *args)
i += 1000
You can avoid having to call this method explicitly if you use the with statement, which will shutdown the Executor (waiting as if Executor.shutdown() were called with wait set to True)
Solution 4:
Looking at the concurrent.futures.as_completed()
function, I learned it is enough to ensure there is no longer any reference to the future. If you dispense this reference as soon as you've got the result, you'll minimise memory usage.
I use a generator expression for storing my Future
instances because everything I care about is already returned by the future in its result (basically, the status of the dispatched work). Other implementations use a dict
for example like in your case, because you don't return the input filename as part of the thread workers result.
Using a generator expression means once the result is yielded, there is no longer any reference to the Future
. Internally, as_completed()
has already taken care of removing its own reference, after it yielded the completed Future
to you.
futures = (executor.submit(thread_worker, work) for work in workload)
for future in concurrent.futures.as_completed(futures):
output = future.result()
... # on next loop iteration, garbage will be collected for the result data, too
Edit: Simplified from using a set
and removing entries, to simply using a generator expression.
Post a Comment for "Using Concurrent Futures Without Running Out Of Ram"