Python: Multithreading Complex Objects
Solution 1:
Here's a complete, executable program that appears to work fine. I expect you're mostly seeing "weird" behavior because, as I suggested in a comment, you're counting job successors instead of job predecessors. So I renamed things with "succ" and "pred" in their names to make that much clearer. daemon
threads are also usually a Bad Idea, so this code arranges to shut down all the threads cleanly when the work is over. Note too the use of assertions to verify that implicit beliefs are actually true ;-)
import threading
import Queue
import random
NTHREADS = 10
NJOBS = 10000classJob(object):
def__init__(self, name):
self.name = name
self.done = False
self.succs = []
self.npreds = 0defwork(self):
assertnot self.done
self.done = TruereturnTruedefadd_dependent(self, another_job):
self.succs.append(another_job)
another_job.npreds += 1defworker(q, lock):
whileTrue:
job = q.get()
if job isNone:
break
success = job.work()
if success:
for succ in job.succs:
with lock:
assert succ.npreds > 0
succ.npreds -= 1if succ.npreds == 0:
q.put(succ)
q.task_done()
jobs = [Job(i) for i inrange(NJOBS)]
for i, job inenumerate(jobs):
# pick some random successors
possible = xrange(i+1, NJOBS)
succs = random.sample(possible,
min(len(possible),
random.randrange(10)))
for succ in succs:
job.add_dependent(jobs[succ])
q = Queue.Queue()
for job in jobs:
if job.npreds == 0:
q.put(job)
print q.qsize(), "ready jobs initially"
lock = threading.Lock()
threads = [threading.Thread(target=worker,
args=(q, lock))
for _ inrange(NTHREADS)]
for t in threads:
t.start()
q.join()
# add sentinels so threads end cleanlyfor t in threads:
q.put(None)
for t in threads:
t.join()
for job in jobs:
assert job.done
assert job.npreds == 0
CLARIFYING THE LOCK
In a sense, the lock in this code protects "too much". The potential problem it's addressing is that multiple threads may try to decrement the .npreds
member of the same Job
object simultaneously. Without mutual exclusion, the stored value at the end of that may be anywhere from 1 smaller than its initial value, to the correct result (the initial value minus the number of threads trying to decrement it).
But there's no need to also mutate the queue under lock protection. Queues do their own thread-safe locking. So, e.g., the code could be written like so instead:
for succ in job.succs:with lock:npreds=succ.npreds=succ.npreds-1assertnpreds>=0ifnpreds==0:q.put(succ)
It's generally best practice to hold a lock for as little time as possible. However, I find this rewrite harder to follow. Pick your poison ;-)
Post a Comment for "Python: Multithreading Complex Objects"