芹菜工人内的多线程

Dom*_*ral 12 python multithreading api-design rabbitmq celery

我正在使用Celery和RabbitMQ来处理来自API请求的数据.过程如下:

请求 - > API - > RabbitMQ - >芹菜工人 - >返回

理想情况下,我会产生更多的芹菜工人,但我受内存限制的限制.

目前,我的流程中的瓶颈是从传递给worker的URL中获取和下载数据.粗糙,过程看起来像这样:

celery_gets_job(url):
    var data = fetches_url(url) # takes 0.1s to 1.0s (bottleneck)
    var result = processes_data(data) # takes 0.1ss
    return result
Run Code Online (Sandbox Code Playgroud)

这是不可接受的,因为工作人员在获取URL时被锁定了一段时间.我期待通过线程改进这一点,但我不确定最佳做法是什么:

  • 有没有办法让芹菜工作者在不同的线程中同时处理数据时异步下载传入的数据?

  • 我是否可以通过RabbitMQ获取具有某种形式的消息传递的单独工作者获取和处理?

oto*_*las 3

使用该eventlet库,您可以修补标准库以使其异步。

首先导入异步 urllib2:

from eventlet.green import urllib2
Run Code Online (Sandbox Code Playgroud)

所以你会得到 url 正文:

def fetch(url):
    body = urllib2.urlopen(url).read()
    return body
Run Code Online (Sandbox Code Playgroud)

请参阅此处的更多eventlet示例。

  • 另外,直接使用 eventlet 执行池 http://docs.celeryproject.org/en/latest/userguide/concurrency/eventlet.html 应该自动猴子补丁 io 调用。 (3认同)