Python Socket.gethostbyname_ex() Multithread Fails
Solution 1:
You could use a sentinel value to signal threads that there is no work and join the threads instead of queue.task_done()
and queue.join()
:
#!/usr/bin/env pythonimport socket
from Queue import Queue
from threading import Thread
defgetips(queue):
for site initer(queue.get, None):
try: # resolve hostname
result = socket.gethostbyname_ex(site)
except IOError, e:
print("error %s reason: %s" % (site, e))
else:
print("done %s %s" % (site, result))
defmain():
websites = "youtube google non-existent.example facebook yahoo live".split()
websites = [name+'.com'for name in websites]
# Spawn thread pool
queue = Queue()
threads = [Thread(target=getips, args=(queue,)) for _ inrange(20)]
for t in threads:
t.daemon = True
t.start()
# Place work in queuefor site in websites: queue.put(site)
# Put sentinel to signal the endfor _ in threads: queue.put(None)
# Wait for completionfor t in threads: t.join()
main()
gethostbyname_ex()
function is obsolete. To support both IPv4/v6 addresses you could use socket.getaddrinfo()
instead.
Solution 2:
My first idea was that you get errors due to overload on the DNS - maybe your resolver just doesn't allow you to do more than a certain amount of queries per time.
Besides, I spotted some issues:
You forgot to assign
site
correctly in thewhile
loop - which would probably better be replaced by afor
loop iterating over the queue, or something. In your version, you use thesite
variable from the module level namespace, which can lead to queries made double and others skipped.In this place, you have control over if the queue still has entries or awaits some. If both not, you can quit your thread.
For security reasons, you would better do
defmexec(befehl, args=None): cur = conn.cursor() cur.execute(befehl, args)
in order to do afterwards
mexec("UPDATE sites2block SET ip=%s, updated='yes'", result) #puts site in mysqldb
In order to stay compatible with future protocols, you should use socket.getaddrinfo()
instead of socket.gethostbyname_ex(site)
. There you get all IPs you want (at first, you can limit to IPv4, but switching to IPv6 is easier then) and can maybe put them all into the DB.
For your queue, code samples could be
defqueue_iterator(q):
"""Iterate over the contents of a queue. Waits for new elements as long as the queue is still filling."""whileTrue:
try:
item = q.get(block=q.is_filling, timeout=.1)
yield item
q.task_done() # indicate that task is done.except Empty:
# If q is still filling, continue.# If q is empty and not filling any longer, return.ifnot q.is_filling: returndefgetips(i, q):
for site in queue_iterator(q):
#--resolve IP--try:
result = socket.gethostbyname_ex(site)
print(result)
mexec("UPDATE sites2block SET ip=%s, updated='yes'", result) #puts site in mysqldbexcept (socket.gaierror):
print("no ip")
mexec("UPDATE sites2block SET ip='no ip', updated='yes',")
# Indicate it is filling.
q.is_filling = True#Spawn thread poolfor i inrange(num_threads):
worker = Thread(target=getips, args=(i, queue))
worker.setDaemon(True)
worker.start()
#Place work in queuefor site in websites:
queue.put(site)
queue.is_filling = False# we are done filling, if q becomes empty, we are done.#Wait until worker threads are done to exit
queue.join()
should do the trick.
Another issue is your parallel inserting into MySQL. You are only allowed to do one MySQL query at a time. So you could either protect the access via threading.Lock()
or RLock()
, or you could put the answers into another queue which is processed by another thread, which could even bundle them.
Solution 3:
You might find it simpler to use concurrent.futures
than threading
, multiprocessing
, Queue
directly:
#!/usr/bin/env python3import socket
# pip install futures on Python 2.xfrom concurrent.futures import ThreadPoolExecutor as Executor
hosts = "youtube.com google.com facebook.com yahoo.com live.com".split()*100with Executor(max_workers=20) as pool:
for results in pool.map(socket.gethostbyname_ex, hosts, timeout=60):
print(results)
Note: you could easily switch from using threads to processes:
from concurrent.futuresimportProcessPoolExecutorasExecutor
You need it if gethostbyname_ex()
is not thread-safe on your OS e.g., it might be the case on OSX.
If you'd like to process exceptions that might arise in gethostbyname_ex()
:
import concurrent.futures
with Executor(max_workers=20) as pool:
future2host = dict((pool.submit(socket.gethostbyname_ex, h), h)
for h in hosts)
for f in concurrent.futures.as_completed(future2host, timeout=60):
e = f.exception()
print(f.result() if e isNoneelse"{0}: {1}".format(future2host[f], e))
It similar to the example from the docs.
Post a Comment for "Python Socket.gethostbyname_ex() Multithread Fails"