Pir*_*App 6 celery python-3.x celerybeat
我每 15 分钟运行一次 celerybeat 调度程序,我需要从 API 获取数据(速率限制 = 300 个请求/分钟最大值)并将结果存储到数据库中。我想同时在速率限制下并行获取 url。如果任何工人在这里失败,我不想重试,因为我会在 15 分钟后再次 ping。关于如何在芹菜中实现这一点的任何建议。
@celery.task(bind=True)
def fetch_store(self):
start = time()
return c.chain(c.group(emap.s() for _ in range(2000)), ereduce.s(start)).apply_async()
@celery.task(rate_limit='300/m')
def fetch():
#... requests data from external API
return data
@celery.task
def store(numbers, start):
end = time()
logger.info("Received" + numbers + " " + (end - start)/1000 + "seconds")
Run Code Online (Sandbox Code Playgroud)
我通常定义一个自定义Task子类并设置max_retries为0(不是None,这会使其永远重试):
class NoRetryTask(Task):
max_retries = 0
...
Run Code Online (Sandbox Code Playgroud)
您也可以作为装饰器在一行中完成它,如下所示:
@app.task(max_retries=0)
def my_func():
...
Run Code Online (Sandbox Code Playgroud)
请参阅文档以获取更多信息。
| 归档时间: |
|
| 查看次数: |
566 次 |
| 最近记录: |