No Performance Gain After Using Multiprocessing For A Queue-oriented Function
Solution 1:
TL;DR
If your real algorithm doesn't involve costlier calculations than you showed us in your example, the communication overhead for multiprocessing will dominate and make your computation take many times longer than sequential execution.
Your attempt with apply_async
actually just uses one worker of your pool, that's why you don't see a difference. apply_async
is just feeding one worker at once by design. Futher it's not enough to just pass the serial version into the pool if your workers need to share intermediate results so you will have to modify your target function to enable that.
But as already said in the introduction, your computation will only benefit from multiprocessing if it's heavy enough to earn back the overhead of inter-process communication (and process creation).
My solution below for the general problem uses JoinableQueue
in combination with a sentinel value for process termination, to synchronize the workflow. I'm adding a function busy_foo
to make the computation heavier to show a case where multiprocessing has it's benefits.
from multiprocessing import Process
from multiprocessing import JoinableQueue as Queue
import time
SENTINEL = 'SENTINEL'defbusy_foo(x = 10e6):
for _ inrange(int(x)):
x -= 1defenumerate_paths(q_analyze, q_result, n, k):
"""
John want to go up a flight of stairs that has N steps. He can take
up to K steps each time. This function enumerate all different ways
he can go up this flight of stairs.
"""for path initer(q_analyze.get, SENTINEL):
last_step = path[-1]
if last_step >= n:
busy_foo()
# John has reach the top
q_result.put(path)
q_analyze.task_done()
continueelse:
busy_foo()
for i inrange(1, k + 1):
# possible paths from this point
extended_path = path + (last_step + i,)
q_analyze.put(extended_path)
q_analyze.task_done()
if __name__ == '__main__':
N_CORES = 4
N = 6
K = 2
start = time.perf_counter()
q_analyze = Queue()
q_result = Queue()
q_analyze.put((0,))
pool = []
for _ inrange(N_CORES):
pool.append(
Process(target=enumerate_paths, args=(q_analyze, q_result, N, K))
)
for p in pool:
p.start()
q_analyze.join() # block until everything is processedfor p in pool:
q_analyze.put(SENTINEL) # let the processes exit gracefully
results = []
whilenot q_result.empty():
results.append(q_result.get())
for p in pool:
p.join()
print(f'elapsed: {time.perf_counter() - start: .2f} s')
Results
If I'm using the code above with busy_foo
commented out, it takes for N=30, K=2 (2178309 results):
- ~208s N_CORES=4
- 2.78s sequential original
Pickling and Unpickling, threads running against locks etc, account for this huge difference.
Now with busy_foo
enabled for both and N=6, K=2 (21 results) it takes:
- 6.45s N_CORES=4
- 30.46s sequential original
Here the computation was heavy enough to allow the overhead to be earned back.
Numpy
Numpy can speed up vectorized operations many times but you likely would see performance penalties with numpy on this one. Numpy uses contiguous blocks of memory for it's arrays. When you change the array-size the whole array would have to be rebuild again, unlike using python lists.
Post a Comment for "No Performance Gain After Using Multiprocessing For A Queue-oriented Function"