gevent block redis'套接字请求

XIO*_*XIO 5 python redis gevent

目标:从redis中生成一些greenlet worker处理数据pop(从redis弹出然后放入队列)

RUNNING ENV:ubuntu 12.04 PYTHON VER:2.7 GEVENT VER:1.0 RC2 REDIS VER:2.6.5 REDIS-PY VER:2.7.1

from gevent import monkey; monkey.patch_all()
import gevent
from gevent.pool import Group
from gevent.queue import JoinableQueue
import redis

tasks = JoinableQueue()
task_group = Group()

def crawler():
    while True:
        if not tasks.empty():
            print tasks.get()
            gevent.sleep()

task_group.spawn(crawler)
redis_client = redis.Redis()
data = redis_client.lpop('test') #<----------Block here
tasks.put(data)
Run Code Online (Sandbox Code Playgroud)

尝试从redis弹出数据,但它被阻止..并且没有异常提出...只是冻结并删除spawn方法,它会工作..我觉得混淆发生了什么,PLZ帮助!你好!

Did*_*zia 10

gevent提供协作轻量级进程(而不是线程).结果是当你在某处有一个无限循环并且调度程序永远不会重新进入时,程序将阻止占用100%的CPU内核.

在您的示例中,问题是您定义了爬网程序循环的方式.显然,当任务为空时,你有一个无限循环.并且因为gevent.sleep调用(将执行必要的yield操作)仅在任务不为空时调用,这意味着调度程序永远不会被重新进入.

它似乎阻止了lpop命令,因为Redis客户端延迟了连接.事件顺序如下:

  • 产生了任务组; 但是还没有安排greenlet
  • redis_client已构建,但由于实际连接已延迟,因此它不会生成I/O.
  • 叫做lpop; 这次真的需要连接,因为Redis客户端必须等待连接和lpop的回复; 因此它产生于调度程序
  • 调度程序激活爬虫工作者
  • 无限循环,因为任务队列仍然是空的

如果你将gevent.sleep()放在循环本身(在if之后),它会更好地工作,但它仍然是一种实现dequeuer的低效方法.这样的事情会好得多:

def crawler():
    while True:
        x = tasks.get()
        try:
            print "Crawler: ",x
        finally:
            tasks.task_done()
Run Code Online (Sandbox Code Playgroud)

get()调用阻塞了worker,因此当队列为空时,它将避免worker和scheduler之间的乒乓游戏.