标签: celery

Django 中的 Celery 超时

celery中不同时期运行的任务一共有8个。所有这些都是事件驱动的任务。某次事件发生后,他们被解雇了。并且特定任务会持续进行,直到满足某些条件为止。

我已经注册了一项任务,该任务会检查某些条件近两分钟。这个任务在大多数情况下都可以正常工作。但有时任务的预期行为并未达到。

任务签名如下:

任务.py

import time
from celery import shared_task

@shared_task()
def some_celery_task(a, b):
    main_time_end = time.time() + 120
    while time.time() < main_time_end:
        ...
        # some db operations here with given function arguments 'a' and 'b' 
        # this part of the task get execute most of the time

    if time.time() > main_time_end:
        ...
        # some db operations here.
        # this part is the part of the task that doesn't get executed sometimes

Run Code Online (Sandbox Code Playgroud)

视图.py

# the other part of the view …
Run Code Online (Sandbox Code Playgroud)

python django celery celery-task python-3.6

2
推荐指数
1
解决办法
1万
查看次数

Dockerized Celery 工作线程未从 Localstack SQS 队列中获取任务

我正在使用 celery 构建一个 Flask 应用程序来执行后台任务。我的应用程序使用在 Docker 容器中运行的 localstack 来为我的消息代理在本地模拟 SQS。我已经让 Flask 和 celery 在本地运行,以便与 localstack 一起正常工作,我可以看到 Flask 接收请求,将消息添加到 SQS 队列,然后 celery 接收该任务并执行它。

我尝试将 Flask 和 celery 与 localstack 一起进行 dockerize,并且我的所有服务都按预期运行,除了 celery Worker 不执行队列中的任务。我可以在本地启动一个 celery 工作程序来读取队列并执行任务,但 docker celery 工作程序不会提取任何任务。

在 Flask 容器中运行 celery Worker 达到了相同的结果,并添加了--without-gossip我在这个github 线程中找到的参数。

我是否在 docker 架构中遗漏了一些导致 celery 无法从 SQS 队列中提取的内容?

这是我的 docker-compose.yml:


services:
  dev:
    build:
      context: .
      dockerfile: 'dev.Dockerfile'
    ports:
    - "5050:5000"
    restart: always
    volumes:
    - .:/app
    environment:
    - GUNICORN_CMD_ARGS="--reload"
    - docker_env=true
    stdin_open: true …
Run Code Online (Sandbox Code Playgroud)

celery flask docker python-3.6 localstack

2
推荐指数
1
解决办法
2298
查看次数

Celery - 任务 ID 最大长度?

我想将 celery 任务 id 存储在 Django 数据库的 CharField 模型中。我需要指定最大长度。celery 任务 ID 的最大长度是多少?

celery

2
推荐指数
1
解决办法
848
查看次数

重试时celery何时调用on_failure

我在 celery 中使用任务继承max_retries: 3来重试( )某些异常并记录失败。

on_failure在每次失败的尝试时调用还是仅在最后一次尝试(在我的例子中是第三次)之后调用?

def __call__(self, *args, **kwargs):
        try:
            return self.run(*args, **kwargs)
        except InterfaceError as exc:
            self.retry(exc=exc, countdown=5, max_retries=3)

def on_failure(self, exc, task_id, args, kwargs, einfo):
    log(exc)  # This is a gross simplification of the logging
Run Code Online (Sandbox Code Playgroud)

python celery

2
推荐指数
1
解决办法
1026
查看次数

如何从 celery-django 项目安全连接到 Azure redis?

我正在尝试从本地 Redis 容器转换为 Azure 中的托管服务。

我知道我有正确的服务器名称和密钥,因为

redis-cli -h <server-name>.redis.cache.windows.net -p 6379 -a <azure_key>
Run Code Online (Sandbox Code Playgroud)

连接。

我之前在 celery_manager.py 中的本地连接是

app = Celery(broker=redis://redis:6379)
Run Code Online (Sandbox Code Playgroud)

我在启用非 SSL 端口的情况下成功更新了代理。

broker=redis://:<key>@<server-name>.redis.cache.windows.net:6379 per [this question]
Run Code Online (Sandbox Code Playgroud)

我尝试将经纪人更新为:

broker=redis://:<key>@<server-name>.redis.cache.windows.net:6380
Run Code Online (Sandbox Code Playgroud)

我在 Celery 日志中收到此警告:

[2020-12-03 20:54:00,491: WARNING/celery] Secure redis scheme specified (rediss) with no ssl options, defaulting to insecure SSL behaviour.
Run Code Online (Sandbox Code Playgroud)

django-tasks 中的这个异常:

020-12-03 20:54:31,223 - INFO - runworker - Using single-threaded worker.
2020-12-03 20:54:31,224 - INFO - runworker - Running worker against channel layer default (asgi_redis.core.RedisChannelLayer)
2020-12-03 20:54:31,225 - INFO - …
Run Code Online (Sandbox Code Playgroud)

django azure redis celery

2
推荐指数
1
解决办法
2079
查看次数

zsh:找不到命令:celery

每当我运行 时celery -A reminders worker -l INFO --detach,我都会收到以下错误: zsh: command not found: celery 我的假设是错误位于我的路径变量中,因为这是“找到”这些命令的地方。

$ echo $PATH

$ /opt/homebrew/bin:/opt/homebrew/sbin:/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin

当我跑步时pip3 install celery,我发现

Requirement already satisfied: celery in /Users/caineardayfio/Library/Python/3.8/lib/python/site-packages (5.1.2)

Requirement already satisfied: click<8.0,>=7.0 in /Users/caineardayfio/Library/Python/3.8/lib/python/site-packages (from celery) (7.1.2)

...

Requirement already satisfied: amqp<6.0.0,>=5.0.6 in /Users/caineardayfio/Library/Python/3.8/lib/python/site-packages (from kombu<6.0,>=5.1.0->celery) (5.0.9)

Requirement already satisfied: wcwidth in /Users/caineardayfio/Library/Python/3.8/lib/python/site-packages (from prompt-toolkit->click-repl>=0.1.6->celery) (0.2.5)
Run Code Online (Sandbox Code Playgroud)

有人知道此类错误的解决方案吗?谢谢!

python path celery

2
推荐指数
1
解决办法
5206
查看次数

Celery、uvicorn 和 FastAPI

我有一个使用 uvicorn 执行的 FastAPI api 代码。现在我想添加一个队列系统,我认为 Celery 和 Flower 对我来说是很好的工具,因为我的 api 有一些端点使用大量 CPU 并且需要几秒钟的时间来响应。但是,我对添加芹菜有几个问题:

  1. 芹菜可以代替 Uvicorn 吗?我还需要它吗?我在网站上看不到任何他们也考虑 uvicorn 的例子,当你执行 Celery 时似乎不需要它......
  2. 我读过很多关于使用 Celery 为 FastAPI 创建队列的内容。但是,您可以在 FastAPI 中管理队列,而无需使用 Celery。什么更好?为什么?

python api celery fastapi uvicorn

2
推荐指数
1
解决办法
4375
查看次数

在Flask中实现单个工作者的最佳方式

我有一些蜘蛛下载页面并将数据存储在数据库中.我创建了使用管理面板(由Flask-Admin扩展)显示数据库的烧瓶应用程序.现在我想将功能附加到我的烧瓶应用程序以控制蜘蛛状态:打开/关闭.

我觉得它可以通过线程或多处理来实现.Celery不是一个好的决定,因为总计划必须使用最少的内存.

选择哪种方法来实现这个功能?

python multithreading multiprocessing celery flask

1
推荐指数
1
解决办法
2017
查看次数

在芹菜中如何获得队列中的任务位置?

我正在使用Celery和Redis作为代理,我可以看到队列实际上是一个redis列表,其中序列化任务作为项目.

我的问题是,如果我有一个AsyncResult对象作为调用的结果<task>.delay(),有没有办法确定项目在队列中的位置?

更新:

我终于可以使用以下方式获得该职位:

from celery.task.control import inspect
i = inspect()
i.reserved()
Run Code Online (Sandbox Code Playgroud)

但它有点慢,因为它需要与所有工人沟通.

python redis celery django-celery djcelery

1
推荐指数
1
解决办法
2527
查看次数

与芹菜的金字塔周期性任务

我试着用pyramid_celery.我设法运行简单的任务.但金字塔如何运行周期性任务呢?我需要写CELERYBEAT_SCHEDULE的地方?

python celery pyramid

1
推荐指数
1
解决办法
1692
查看次数