Mit*_*ril 4 python queue multithreading
我正在使用线程和队列来获取URL并存储到数据库.
我只想要一个线程来做存储工作.
所以我写代码如下:
import threading
import time
import Queue
site_count = 10
fetch_thread_count = 2
site_queue = Queue.Queue()
proxy_array=[]
class FetchThread(threading.Thread):
def __init__(self,site_queue,proxy_array):
threading.Thread.__init__(self)
self.site_queue = site_queue
self.proxy_array = proxy_array
def run(self):
while True:
index = self.site_queue.get()
self.get_proxy_one_website(index)
self.site_queue.task_done()
def get_proxy_one_website(self,index):
print '{0} fetched site :{1}\n'.format(self.name,index)
self.proxy_array.append(index)
def save():
while True:
if site_queue.qsize() > 0:
if len(proxy_array) > 10:
print 'save :{0} to database\n'.format(proxy_array.pop())
else:
time.sleep(1)
elif len(proxy_array) > 0:
print 'save :{0} to database\n'.format(proxy_array.pop())
elif len(proxy_array) == 0:
print 'break'
break
else:
print 'continue'
continue
def start_crawl():
global site_count,fetch_thread_count,site_queue,proxy_array
print 'init'
for i in range(fetch_thread_count):
ft = FetchThread(site_queue,proxy_array)
ft.setDaemon(True)
ft.start()
print 'put site_queue'
for i in range(site_count):
site_queue.put(i)
save()
print 'start site_queue join'
site_queue.join()
print 'finish'
start_crawl()
Run Code Online (Sandbox Code Playgroud)
出口产量:
init
put site_queue
Thread-1 fetched site :0
Thread-2 fetched site :1
Thread-1 fetched site :2
Thread-2 fetched site :3
Thread-1 fetched site :4
Thread-2 fetched site :5
Thread-1 fetched site :6
Thread-2 fetched site :7
Thread-1 fetched site :8
Thread-2 fetched site :9
save :9 to database
save :8 to database
save :7 to database
save :6 to database
save :5 to database
save :4 to database
save :3 to database
save :2 to database
save :1 to database
save :0 to database
break
start site_queue join
finish
[Finished in 1.2s]
Run Code Online (Sandbox Code Playgroud)
为什么save()函数运行之后site_queue.join()写的save().
我也save() 用线程函数代替了,但它也不起作用.
这是否意味着我必须改变proxy_array=[]对proxy_queue=Queue.Queue(),然后我可以使用theading存储数据?
我只想要一个thead这样做,并且没有任何其他theads会从中获取数据proxy_array,我为什么要加入它?使用Queue似乎很奇怪.
有没有更好的解决方案?
更新:
我不想等到所有的FetchThreads完成他们的工作.我想在fraccing时保存数据,它会快得多.我希望结果如下所示(因为我使用array.pop(),所以保存0可能会出现得很晚,这只是一个容易理解的例子.):
Thread-2 fetched site :1
Thread-1 fetched site :2
save :0 to database
Thread-2 fetched site :3
Thread-1 fetched site :4
save :2 to database
save :3 to database
Thread-2 fetched site :5
Thread-1 fetched site :6
save :4 to database
.......
Run Code Online (Sandbox Code Playgroud)
UPDATE2对某人有同样的问题如下:
问题:
正如我在上面所说的那样,没有任何其他的theads会从proxy_array获取数据.
我无法想象为什么它会破坏线程安全?
回答:
生产者 - 消费者问题在misha的回答中,我仔细阅读后理解.
问:
还有一个问题,如果程序主线程可以像FetchThreads一样扮演comsumer(换句话说,不需要创建StoreThread)
这是我无法弄清楚的,我会在找到答案后更新.
我必须提出类似的生产者 - 消费者.生产者生成一个'id',消费者使用该id来进行一些url获取并将其处理到它.这是我的骨架代码
import Queue
import random
import threading
import time
import sys
data_queue = Queue.Queue()
lock = threading.Lock()
def gcd(a, b):
while b != 0:
a,b = b, a%b
return b
def consumer(idnum):
while True:
try:
data = data_queue.get(block=False)
except Exception, e:
print 'Exception ' + str(e)
else:
with lock:
print('\t consumer %d: computed gcd(%d, %d) = %d' %(idnum, data[0], data[1], gcd(data[0], data[1])))
time.sleep(1)
data_queue.task_done()
def producer(idnum, count):
for i in range(count):
a,b = random.randint(1, sys.maxint), random.randint(1, sys.maxint)
with lock:
print('\t producer %d: generated (%d, %d)'% (idnum, a, b))
data_queue.put((a,b))
time.sleep(0.5)
if __name__ == '__main__':
num_producers = 1
num_consumers = 2
num_integer_pairs = 10
for i in range(num_consumers):
t = threading.Thread(target=consumer, args=(i,))
t.daemon = True
t.start()
threads = []
for ii in range(num_producers):
thread = threading.Thread(target=producer, args=(ii, num_integer_pairs))
threads.append(thread)
thread.start()
# wait for the producers threads to finish
for thread in threads:
thread.join()
print 'done with producer threads'
# wait till all the jobs are done in the queue
data_queue.join()
with lock:
print 'all consumer threads finished'
with lock:
print 'main thread exited'
Run Code Online (Sandbox Code Playgroud)