ery*_*ydo 11 python asynchronous task aggregation celery
我打算使用Celery来处理由我的主服务器发送的事件触发的推送通知和电子邮件.
这些任务需要打开与外部服务器(GCM,APS,电子邮件服务器等)的连接.它们可以一次处理一个,也可以通过单个连接批量处理,以获得更好的性能.
通常会在短时间内单独触发这些任务的几个实例.例如,在一分钟的时间内,可能会有几十个推送通知需要通过不同的消息发送给不同的用户.
在芹菜中处理这个问题的最佳方法是什么?似乎天真的方式是为每条消息简单地创建一个不同的任务,但这需要为每个实例打开一个连接.
我希望会有某种任务聚合器允许我处理例如"所有未完成的推送通知任务".
这样的事情存在吗?有没有更好的方法来解决它,例如附加到活动任务组?
我错过了什么吗?
罗伯特
我最近发现并celery.contrib.batches
在我的项目中实现了该模块.在我看来,这是一个比Tommaso的答案更好的解决方案,因为你不需要额外的存储层.
以下是直接来自文档的示例:
一个点击计数器,每100条消息和每秒刷新一次缓冲区.不对数据执行任何操作,但可以轻松修改以将其存储在数据库中.
# Flush after 100 messages, or 10 seconds.
@app.task(base=Batches, flush_every=100, flush_interval=10)
def count_click(requests):
from collections import Counter
count = Counter(request.kwargs['url'] for request in requests)
for url, count in count.items():
print('>>> Clicks: {0} -> {1}'.format(url, count))
Run Code Online (Sandbox Code Playgroud)
但要小心,它适用于我的使用,但它提到这是文档中的"实验任务类".这可能会阻止某些人使用具有这种易变性描述的功能:)