celery中不同时期运行的任务一共有8个。所有这些都是事件驱动的任务。某次事件发生后,他们被解雇了。并且特定任务会持续进行,直到满足某些条件为止。
我已经注册了一项任务,该任务会检查某些条件近两分钟。这个任务在大多数情况下都可以正常工作。但有时任务的预期行为并未达到。
任务签名如下:
任务.py
import time
from celery import shared_task
@shared_task()
def some_celery_task(a, b):
main_time_end = time.time() + 120
while time.time() < main_time_end:
...
# some db operations here with given function arguments 'a' and 'b'
# this part of the task get execute most of the time
if time.time() > main_time_end:
...
# some db operations here.
# this part is the part of the task that doesn't get executed sometimes
Run Code Online (Sandbox Code Playgroud)
视图.py
# the other part of the view …
Run Code Online (Sandbox Code Playgroud) 我正在使用 celery 构建一个 Flask 应用程序来执行后台任务。我的应用程序使用在 Docker 容器中运行的 localstack 来为我的消息代理在本地模拟 SQS。我已经让 Flask 和 celery 在本地运行,以便与 localstack 一起正常工作,我可以看到 Flask 接收请求,将消息添加到 SQS 队列,然后 celery 接收该任务并执行它。
我尝试将 Flask 和 celery 与 localstack 一起进行 dockerize,并且我的所有服务都按预期运行,除了 celery Worker 不执行队列中的任务。我可以在本地启动一个 celery 工作程序来读取队列并执行任务,但 docker celery 工作程序不会提取任何任务。
在 Flask 容器中运行 celery Worker 达到了相同的结果,并添加了--without-gossip
我在这个github 线程中找到的参数。
我是否在 docker 架构中遗漏了一些导致 celery 无法从 SQS 队列中提取的内容?
这是我的 docker-compose.yml:
services:
dev:
build:
context: .
dockerfile: 'dev.Dockerfile'
ports:
- "5050:5000"
restart: always
volumes:
- .:/app
environment:
- GUNICORN_CMD_ARGS="--reload"
- docker_env=true
stdin_open: true …
Run Code Online (Sandbox Code Playgroud) 我想将 celery 任务 id 存储在 Django 数据库的 CharField 模型中。我需要指定最大长度。celery 任务 ID 的最大长度是多少?
我在 celery 中使用任务继承max_retries: 3
来重试( )某些异常并记录失败。
是on_failure
在每次失败的尝试时调用还是仅在最后一次尝试(在我的例子中是第三次)之后调用?
def __call__(self, *args, **kwargs):
try:
return self.run(*args, **kwargs)
except InterfaceError as exc:
self.retry(exc=exc, countdown=5, max_retries=3)
def on_failure(self, exc, task_id, args, kwargs, einfo):
log(exc) # This is a gross simplification of the logging
Run Code Online (Sandbox Code Playgroud) 我正在尝试从本地 Redis 容器转换为 Azure 中的托管服务。
我知道我有正确的服务器名称和密钥,因为
redis-cli -h <server-name>.redis.cache.windows.net -p 6379 -a <azure_key>
Run Code Online (Sandbox Code Playgroud)
连接。
我之前在 celery_manager.py 中的本地连接是
app = Celery(broker=redis://redis:6379)
Run Code Online (Sandbox Code Playgroud)
我在启用非 SSL 端口的情况下成功更新了代理。
broker=redis://:<key>@<server-name>.redis.cache.windows.net:6379 per [this question]
Run Code Online (Sandbox Code Playgroud)
我尝试将经纪人更新为:
broker=redis://:<key>@<server-name>.redis.cache.windows.net:6380
Run Code Online (Sandbox Code Playgroud)
我在 Celery 日志中收到此警告:
[2020-12-03 20:54:00,491: WARNING/celery] Secure redis scheme specified (rediss) with no ssl options, defaulting to insecure SSL behaviour.
Run Code Online (Sandbox Code Playgroud)
django-tasks 中的这个异常:
020-12-03 20:54:31,223 - INFO - runworker - Using single-threaded worker.
2020-12-03 20:54:31,224 - INFO - runworker - Running worker against channel layer default (asgi_redis.core.RedisChannelLayer)
2020-12-03 20:54:31,225 - INFO - …
Run Code Online (Sandbox Code Playgroud) 每当我运行 时celery -A reminders worker -l INFO --detach
,我都会收到以下错误:
zsh: command not found: celery
我的假设是错误位于我的路径变量中,因为这是“找到”这些命令的地方。
$ echo $PATH
$ /opt/homebrew/bin:/opt/homebrew/sbin:/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin
当我跑步时pip3 install celery
,我发现
Requirement already satisfied: celery in /Users/caineardayfio/Library/Python/3.8/lib/python/site-packages (5.1.2)
Requirement already satisfied: click<8.0,>=7.0 in /Users/caineardayfio/Library/Python/3.8/lib/python/site-packages (from celery) (7.1.2)
...
Requirement already satisfied: amqp<6.0.0,>=5.0.6 in /Users/caineardayfio/Library/Python/3.8/lib/python/site-packages (from kombu<6.0,>=5.1.0->celery) (5.0.9)
Requirement already satisfied: wcwidth in /Users/caineardayfio/Library/Python/3.8/lib/python/site-packages (from prompt-toolkit->click-repl>=0.1.6->celery) (0.2.5)
Run Code Online (Sandbox Code Playgroud)
有人知道此类错误的解决方案吗?谢谢!
我有一个使用 uvicorn 执行的 FastAPI api 代码。现在我想添加一个队列系统,我认为 Celery 和 Flower 对我来说是很好的工具,因为我的 api 有一些端点使用大量 CPU 并且需要几秒钟的时间来响应。但是,我对添加芹菜有几个问题:
我有一些蜘蛛下载页面并将数据存储在数据库中.我创建了使用管理面板(由Flask-Admin扩展)显示数据库的烧瓶应用程序.现在我想将功能附加到我的烧瓶应用程序以控制蜘蛛状态:打开/关闭.
我觉得它可以通过线程或多处理来实现.Celery不是一个好的决定,因为总计划必须使用最少的内存.
选择哪种方法来实现这个功能?
我正在使用Celery和Redis作为代理,我可以看到队列实际上是一个redis列表,其中序列化任务作为项目.
我的问题是,如果我有一个AsyncResult对象作为调用的结果<task>.delay()
,有没有办法确定项目在队列中的位置?
更新:
我终于可以使用以下方式获得该职位:
from celery.task.control import inspect
i = inspect()
i.reserved()
Run Code Online (Sandbox Code Playgroud)
但它有点慢,因为它需要与所有工人沟通.
我试着用pyramid_celery.我设法运行简单的任务.但金字塔如何运行周期性任务呢?我需要写CELERYBEAT_SCHEDULE的地方?