标签: celery

芹菜任务记录器不写

我对 celery 的记录器有问题。我有一个渲染帧的函数。我记录了我生成的子进程的输出,但似乎只有每个工作人员从队列中挑选的第一个作业被写入。队列中的所有后续任务都不会生成日志文件。我也尝试过使用 python 自己的日志记录,但发生了同样的问题。是否有我可能缺少的配置?

 @task(queue='rndr')
 def rndr(params):

     path      = get_logger_path(params)
     logger    = rndr.get_logger(logfile=path)    
     return render(params, logger)
Run Code Online (Sandbox Code Playgroud)

我以这种方式定义我的任务,因为我的重试记录器的定义不同,即 rndr_retry.get_logger ...

我的 celeryconfig 如下所示:

 BROKER_HOST = "xxx.xxx.xxx.xxx"
 BROKER_PORT = 5672
 BROKER_USER = "xxxx"
 BROKER_PASSWORD = "xxxx"

 CELERY_RESULT_BACKEND = 'amqp'
 CELERY_DISABLE_RATE_LIMITS = True
 CELERY_ACKS_LATE = True
 CELERY_IMPORTS = ['lib.tasks.concatenate', 'lib.tasks.encode', 'lib.tasks.render',  'lib.tasks.still_image', 'lib.tasks.retry']
 CELERY_ROUTES = {'lib.tasks.encode':{'queue': 'encode'},
             'lib.tasks.concatenate':{'queue': 'encode'},
             'lib.tasks.still_image':{'queue': 'encode'},
             'lib.tasks.render':{'queue':'rndr'},
             'lib.tasks.retry':{'queue': 'retry'}
             }
Run Code Online (Sandbox Code Playgroud)

希望有人能解释为什么只有队列中的第一个任务会写...

先感谢您。

更新:根据要求,这里是渲染方法的部分版本,没有所有细节......

def render(params, logger):

    #load params to local values

    try:
        #create subprocess
        output = child_proc.communicate()[0] …
Run Code Online (Sandbox Code Playgroud)

python logging celery

0
推荐指数
1
解决办法
3058
查看次数

Celery 错误:“没有名为 billiard.forking 的模块” - 如何诊断?

我不知道从哪里开始诊断和解决这个问题:

$  bin/django celeryd -l DEBUG -v 3


 -------------- celery@lucid32 v3.0.3 (Chiastic Slide)
---- **** ----- 
--- * ***  * -- [Configuration]
-- * - **** --- . broker:      django://localhost//
- ** ---------- . app:         default:0x8b0aa4c (djcelery.loaders.DjangoLoader)
- ** ---------- . concurrency: 1 (processes)
- ** ---------- . events:      OFF (enable -E to monitor this worker)
- ** ---------- 
- *** --- * --- [Queues]
-- ******* ---- . celery:      exchange:celery(direct) binding:celery
--- ***** ----- 

[Tasks]
  . celery.backend_cleanup
  . celery.chain …
Run Code Online (Sandbox Code Playgroud)

django fork egg celery

0
推荐指数
1
解决办法
2729
查看次数

Python Celery 任务的状态

我想知道在将 celery 任务部署到工作人员后,有哪些选项可用于从浏览器监控 celery 任务?

我当前的应用程序堆栈是一个在扭曲内部运行的烧瓶应用程序,使用 celery 运行数十到数千个小型后台任务(更新存储库中的元数据,创建图像衍生品等)我设想使用 ajax 长轮询来监视状态由用户启动的 celery 任务。我将 redis 用于后端代理和结果。

我看到 celery 有一些命令行方法来监控任务,或者有一个 web 仪表板的。但是,如果我想从发送到 celery 的特定任务中查看更详细的状态,那么该任务打印/写入日志文件,然后长轮询该文件以获取来自flask前端的更改是否更有意义?

此时用户可以说“更新这10,000个项目”,任务被发送到celery,前端很快就说,“作业已发送!”。而且任务确实完成了。但是我想让用户导航到“/status”并查看那 10,000 个小作业的状态 - 即使是滚动日志文件也可能会起作用。

任何建议将不胜感激。花了很多挠头,使这一步勾画出来的东西,但我纺纱我的车轮正好搞清楚什么从用户前端长查询。

python long-polling redis celery flask

0
推荐指数
1
解决办法
2370
查看次数

芹菜是否正确遵循我的速率限制?

我在 Celery 中有一个速率限制任务,但 Flower 中显示的结果使它看起来好像它正在立即处理一堆任务,然后遵守速率限制。为什么会发生这种情况?我需要做些什么来确保它从一开始就遵守速率限制吗?

@shared_task(rate_limit="4/m")
    def my_task(a,b):
        ...
Run Code Online (Sandbox Code Playgroud)

...

my_task.apply_async((x, y,),)
Run Code Online (Sandbox Code Playgroud)

成功的任务

脚步

python django celery flower

0
推荐指数
1
解决办法
1246
查看次数

Celery 与 Redis 对比 vs 单独 Redis

我无法理解使用 Celery 的优势是什么。我知道你可以将 Celery 与 Redis、RabbitMQ 等一起使用,但为什么我不直接获取这些消息队列服务的客户端,而不是将 Celery 放在它前面呢?

python java messaging rabbitmq celery

0
推荐指数
1
解决办法
1239
查看次数

如何在创建相同对象 24 小时后删除对象

我想在 24 小时后删除一个实例,因为该实例已创建如何使用 celery

创建实例后如何启动“TIMER”?

我想要像 Snapchat 这样的东西

database django django-models celery

0
推荐指数
1
解决办法
2151
查看次数

Celery:为某些任务分配特定的时间限制

我有一个这样写的任务:

@async_runner.app.task(name='task_name')
def async_task():
    async_runner.send_task(
        task_fn=task_processing,
        queue='queue_name',
        options=async_runner.DEFAULT_RETRY_POLICY
    )
Run Code Online (Sandbox Code Playgroud)

我的默认任务时间限制是 30 分钟。我想将此特定任务的时间限制增加到 1 小时。

如何为这一项任务设置不同的时间限制?

我已经看过这个,但我的问题是特定于 Flask 以及如何在 Flask 中配置 Celery。谢谢。

python celery

0
推荐指数
1
解决办法
1715
查看次数

为 Django 项目中的可重用应用程序创建基于类的 Celery 任务

对于 Django 项目来说,创建基于函数的任务非常干净。只需在 django 应用程序中创建tasks.py并开始编写任务,就像这个示例一样,该示例取自官方celery文档:http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html

from __future__ import absolute_import, unicode_literals
from celery import shared_task

@shared_task
def add(x, y):
    return x + y


@shared_task
def mul(x, y):
    return x * y
Run Code Online (Sandbox Code Playgroud)

但有时基于函数的任务是紧密耦合的并且可重用性不高。所以我想创建基于类的芹菜任务,该任务记录在官方网站中。在遵循https://github.com/celery/celery/issues/3874之后,我可以创建示例任务,但我不确定它是否是创建基于类的任务的正确方法。

from __future__ import absolute_import, unicode_literals
from celery import shared_task, Task
import time
from celery import current_app


@shared_task
def add(x, y):
    time.sleep(5)
    return x + y


@shared_task
def mul(x, y):
    return x * y

# Sample class based task for testing
class AnotherTask(current_app.Task):
    name = 'tasks.another_task'
    def …
Run Code Online (Sandbox Code Playgroud)

python django task celery django-celery

0
推荐指数
1
解决办法
5948
查看次数

我可以使用环境变量让 Celery worker 连接到不同的 worker 吗?

我正在按照手册中描述的方式启动 Celery 工人......

$ stoneid salimfadhley$ celery worker

 -------------- celery@TCR-C02X29QPJHC9.local v4.2.1 (windowlicker)
---- **** ----- 
--- * ***  * -- Darwin-18.2.0-x86_64-i386-64bit 2018-11-13 14:00:20
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         default:0x10fb4ee48 (.default.Loader)
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues] …
Run Code Online (Sandbox Code Playgroud)

celery docker

0
推荐指数
1
解决办法
2410
查看次数

我们如何在 Django 中创建异步 API?

我想创建一个第三方聊天机器人 API,它是异步的,并在 10 秒暂停后回复“ok”。

import time

def wait():
    time.sleep(10)
    return "ok"

# views.py
def api(request):
    return wait()
Run Code Online (Sandbox Code Playgroud)

我已经尝试了 celery,如下所示,我正在等待 celery 响应:

import time
from celery import shared_task

@shared_task
def wait():
    time.sleep(10)
    return "ok"

# views.py
def api(request):
    a = wait.delay()
    work = AsyncResult(a.id)
    while True:
        if work.ready():
           return work.get(timeout=1)
Run Code Online (Sandbox Code Playgroud)

但是这个解决方案是同步工作的,没有区别。我们如何在不要求用户继续请求直到收到结果的情况下使其异步?

python django asynchronous celery django-celery

0
推荐指数
1
解决办法
5058
查看次数