使用协程与线程时的吞吐量差异

Mri*_*lla 5 python multithreading coroutine gevent

几天前,我提出了一个关于如何帮助我设计构建多个HTTP请求的范例的问题

这是场景.我想拥有一个多生产者,多消费者系统.我的生产者抓取并抓取一些网站,并将它找到的链接添加到队列中.由于我将抓取多个网站,我希望有多个生产者/抓取工具.

消费者/工作者以此队列为食,向这些链接发出TCP/UDP请求并将结果保存到我的Django DB.我还希望有多个工作人员,因为每个队列项目完全相互独立.

人们建议使用coroutine库,即Gevent或Eventlet.从未使用过coroutines,我读到即使编程范例类似于线程范例,只有一个线程正在执行,但是当阻塞调用发生时 - 例如I/O调用 - 堆栈在内存中切换而另一个在绿色中切换线程接管,直到它遇到某种阻塞I/O调用.希望我做对了吗?这是我的一篇SO帖子中的代码:

import gevent
from gevent.queue import *
import time
import random

q = JoinableQueue()
workers = []
producers = []


def do_work(wid, value):
    gevent.sleep(random.randint(0,2))
    print 'Task', value, 'done', wid


def worker(wid):
    while True:
        item = q.get()
        try:
            print "Got item %s" % item
            do_work(wid, item)
        finally:
            print "No more items"
            q.task_done()


def producer():
    while True:
        item = random.randint(1, 11)
        if item == 10:
            print "Signal Received"
            return
        else:
            print "Added item %s" % item
            q.put(item)


for i in range(4):
    workers.append(gevent.spawn(worker, random.randint(1, 100000)))

# This doesn't work.
for j in range(2):
    producers.append(gevent.spawn(producer))

# Uncommenting this makes this script work.
# producer()

q.join()
Run Code Online (Sandbox Code Playgroud)

这很有效,因为sleep调用是阻塞调用,当sleep事件发生时,另一个绿色线程接管.这比顺序执行快得多.正如您所看到的,我的程序中没有任何代码可以故意将一个线程执行到另一个线程.我没有看到这适合上面的场景,因为我希望所有线程同时执行.

一切正常,但我觉得我使用Gevent/Eventlets实现的吞吐量高于原始的顺序运行程序,但是远远低于使用真实线程可以实现的吞吐量.

如果我要使用线程机制重新实现我的程序,我的每个生产者和消费者可以同时工作,而不需要像协同程序那样交换堆栈.

是否应该使用线程重新实现?我的设计错了吗?我没有看到使用协同程序的真正好处.

也许我的概念有点浑浊,但这就是我所认同的.对我的范例和概念的任何帮助或澄清都会很棒.

谢谢

jfs*_*jfs 5

正如您所看到的,我的程序中没有任何代码可以故意将一个线程执行到另一个线程.我没有看到这适合上面的场景,因为我希望所有线程同时执行.

有一个OS线程但有几个greenlet.在您的情况下gevent.sleep()允许工作者同时执行.阻止IO调用,例如,urllib2.urlopen(url).read()如果使用urllib2patched gevent(通过调用gevent.monkey.patch_*()),则执行相同的操作.

另请参阅关于协程和并发的好奇课程,以了解代码如何在单线程环境中同时工作.

要比较gevent,线程,多处理之间的吞吐量差异,您可以编写与所有aproaches兼容的代码:

#!/usr/bin/env python
concurrency_impl = 'gevent' # single process, single thread
##concurrency_impl = 'threading' # single process, multiple threads
##concurrency_impl = 'multiprocessing' # multiple processes

if concurrency_impl == 'gevent':
    import gevent.monkey; gevent.monkey.patch_all()

import logging
import time
import random
from itertools import count, islice

info = logging.info

if concurrency_impl in ['gevent', 'threading']:
    from Queue import Queue as JoinableQueue
    from threading import Thread
if concurrency_impl == 'multiprocessing':
    from multiprocessing import Process as Thread, JoinableQueue
Run Code Online (Sandbox Code Playgroud)

所有并发实现的其余部分都是相同的:

def do_work(wid, value):
    time.sleep(random.randint(0,2))
    info("%d Task %s done" % (wid, value))

def worker(wid, q):
    while True:
        item = q.get()
        try:
            info("%d Got item %s" % (wid, item))
            do_work(wid, item)
        finally:
            q.task_done()
            info("%d Done item %s" % (wid, item))

def producer(pid, q):
    for item in iter(lambda: random.randint(1, 11), 10):
        time.sleep(.1) # simulate a green blocking call that yields control
        info("%d Added item %s" % (pid, item))
        q.put(item)
    info("%d Signal Received" % (pid,))
Run Code Online (Sandbox Code Playgroud)

不要在模块级别执行代码放入main():

def main():
    logging.basicConfig(level=logging.INFO,
                        format="%(asctime)s %(process)d %(message)s")

    q = JoinableQueue()
    it = count(1)
    producers = [Thread(target=producer, args=(i, q)) for i in islice(it, 2)]
    workers = [Thread(target=worker, args=(i, q)) for i in islice(it, 4)]
    for t in producers+workers:
        t.daemon = True
        t.start()

    for t in producers: t.join() # put items in the queue
    q.join() # wait while it is empty
    # exit main thread (daemon workers die at this point)

if __name__=="__main__":    
   main()
Run Code Online (Sandbox Code Playgroud)