Skip to content Skip to sidebar Skip to footer

Python: Multithreading Complex Objects

class Job(object): def __init__(self, name): self.name = name self.depends = [] self.waitcount = 0 def work(self): #does some work d

Solution 1:

Here's a complete, executable program that appears to work fine. I expect you're mostly seeing "weird" behavior because, as I suggested in a comment, you're counting job successors instead of job predecessors. So I renamed things with "succ" and "pred" in their names to make that much clearer. daemon threads are also usually a Bad Idea, so this code arranges to shut down all the threads cleanly when the work is over. Note too the use of assertions to verify that implicit beliefs are actually true ;-)

import threading
import Queue
import random

NTHREADS = 10
NJOBS = 10000classJob(object):
    def__init__(self, name):
        self.name = name
        self.done = False
        self.succs = []
        self.npreds = 0defwork(self):
        assertnot self.done
        self.done = TruereturnTruedefadd_dependent(self, another_job):
        self.succs.append(another_job)
        another_job.npreds += 1defworker(q, lock):
    whileTrue:
        job = q.get()
        if job isNone:
            break
        success = job.work()
        if success:
            for succ in job.succs:
                with lock:
                    assert succ.npreds > 0
                    succ.npreds -= 1if succ.npreds == 0:
                        q.put(succ)
        q.task_done()

jobs = [Job(i) for i inrange(NJOBS)]
for i, job inenumerate(jobs):
    # pick some random successors
    possible = xrange(i+1, NJOBS)
    succs = random.sample(possible,
                          min(len(possible),
                              random.randrange(10)))
    for succ in succs:
        job.add_dependent(jobs[succ])

q = Queue.Queue()
for job in jobs:
    if job.npreds == 0:
        q.put(job)
print q.qsize(), "ready jobs initially"

lock = threading.Lock()
threads = [threading.Thread(target=worker,
                            args=(q, lock))
           for _ inrange(NTHREADS)]

for t in threads:
    t.start()
q.join()
# add sentinels so threads end cleanlyfor t in threads:
    q.put(None)
for t in threads:
    t.join()
for job in jobs:
    assert job.done
    assert job.npreds == 0

CLARIFYING THE LOCK

In a sense, the lock in this code protects "too much". The potential problem it's addressing is that multiple threads may try to decrement the .npreds member of the same Job object simultaneously. Without mutual exclusion, the stored value at the end of that may be anywhere from 1 smaller than its initial value, to the correct result (the initial value minus the number of threads trying to decrement it).

But there's no need to also mutate the queue under lock protection. Queues do their own thread-safe locking. So, e.g., the code could be written like so instead:

for succ in job.succs:with lock:npreds=succ.npreds=succ.npreds-1assertnpreds>=0ifnpreds==0:q.put(succ)

It's generally best practice to hold a lock for as little time as possible. However, I find this rewrite harder to follow. Pick your poison ;-)

Post a Comment for "Python: Multithreading Complex Objects"