在我的代码中,我有两个假设的任务:一个从生成器获取URL并使用Twisted的Cooperator批量下载它们,另一个获取下载的源并异步解析它.我正在尝试将所有获取和解析任务封装到一个Deferred对象中,该对象在下载所有页面并解析所有源时回调.
我想出了以下解决方案:
from twisted.internet import defer, task, reactor, threads
from twisted.web.client import getPage
BATCH_SIZE = 5
def main_task():
result = defer.Deferred()
state = {'count': 0, 'done': False}
def on_parse_finish(r):
state['count'] -= 1
if state['done'] and state['count'] == 0:
result.callback(True)
def process(source):
deferred = parse(source)
state['count'] += 1
deferred.addCallback(on_parse_finish)
def fetch_urls():
for url in get_urls():
deferred = getPage(url)
deferred.addCallback(process)
yield deferred
def on_finish(r):
state['done'] = True
deferreds = []
coop = task.Cooperator()
urls = fetch_urls()
for _ in xrange(BATCH_SIZE):
deferreds.append(coop.coiterate(urls))
main_tasks …Run Code Online (Sandbox Code Playgroud) 我在一个项目中使用Python和芹菜.在项目中,我有两个文件:
celeryconfig.py
BROKER_URL = "amqp://guest:guest@localhost:5672//"
CELERY_RESULT_BACKEND = "amqp"
CELERY_IMPORTS = ("example",)
CELERYD_CONCURRENCY = 2
Run Code Online (Sandbox Code Playgroud)
和example.py
from celery.task import task
import hashlib
md5 = hashlib.md5()
@task
def getDigest(text):
print 'Using md5 - ',md5
md5.update(text)
return md5.digest()
Run Code Online (Sandbox Code Playgroud)
在celeryconfig.py中,我将CELERYD_CONCURRENCY设置为2,这意味着它会将我的任务队列中的任务分配给2个不同的进程.
从Python控制台,我运行:
from example import getDigest
getDigest.delay('foo');getDigest.delay('bar')
Run Code Online (Sandbox Code Playgroud)
这会创建两个由两个工作人员同时执行的任务.问题是,当两个工作进程都运行它们的任务函数[ getDigest() ]时,它们似乎使用相同的哈希对象(md5).celeryd的输出确认了这一点,如下所示.
[PoolWorker-2] Using md5 -
[PoolWorker-2] <md5 HASH object @ 0x23e6870>
[PoolWorker-1] Using md5 -
[PoolWorker-1] <md5 HASH object @ 0x23e6870> …Run Code Online (Sandbox Code Playgroud) python ×2
asynchronous ×1
callback ×1
celery ×1
celery-task ×1
celeryd ×1
deferred ×1
twisted ×1