标签: celery

Celery - 获取当前任务的任务ID

如何从任务中获取任务的task_id值?这是我的代码:

from celery.decorators import task
from django.core.cache import cache

@task
def do_job(path):
    "Performs an operation on a file"

    # ... Code to perform the operation ...

    cache.set(current_task_id, operation_results)
Run Code Online (Sandbox Code Playgroud)

我的想法是,当我创建任务的新实例时,我task_id从任务对象中检索它.然后我使用任务ID来确定任务是否已完成.我不想通过path值跟踪任务,因为文件在任务完成后被"清理",并且可能存在也可能不存在.

在上面的例子中,我将如何获得值current_task_id

python django celery

71
推荐指数
3
解决办法
4万
查看次数

如何手动从shell运行芹菜定期任务?

我正在使用芹菜和django芹菜.我已经定义了一个我想测试的周期性任务.是否可以手动从shell运行周期性任务,以便查看控制台输出?

python django celery celery-task django-celery

68
推荐指数
3
解决办法
3万
查看次数

Rabbitmq或Gearman - 选择一个工作队列

在工作中,我们需要构建一个作业服务器,用于发送电子邮件,构建PDF,处理一些数据等.显然,我们希望建立一些通用的排队系统.我熟悉Gearman,这是它试图解决的确切问题:将工作放在工作人员来接他们的队列中.但是,我看到很多关于Rabbitmq的提及,我不清楚它在这种情况下是如何使用的.

Rabbitmq是构建分布式作业系统的好框架吗?

message-queue gearman rabbitmq celery

67
推荐指数
2
解决办法
4万
查看次数

了解芹菜任务预取

我刚刚发现了配置选项CELERYD_PREFETCH_MULTIPLIER(docs).默认值为4,但(我相信)我希望预取或尽可能低.我现在把它设置为1,这足够接近我正在寻找的东西,但仍有一些我不明白的东西:

  1. 为什么这个预取是一个好主意?除非消息队列和工作人员之间存在大量延迟(在我的情况下,他们当前在同一主机上运行,​​最糟糕的情况可能最终在同一数据中的不同主机上运行),我真的没有理由看到它.中央).文档仅提到了缺点,但未能解释其优点.

  2. 许多人似乎把它设置为0,期望能够以这种方式关闭预取(在我看来是一个合理的假设).但是,0表示无限预取.为什么有人会想要无限制的预取,这不是完全消除了你首先引入任务队列的并发/异步性吗?

  3. 为什么不能关闭预取?在大多数情况下,关闭性能可能不是一个好主意,但有技术上的原因是不可能的吗?还是只是没有实施?

  4. 有时,此选项已连接到CELERY_ACKS_LATE.例如.Roger Hu写道 «[...]通常[用户]真正想要的是让工人只保留与子进程一样多的任务.但是,如果没有启用延迟确认,这是不可能的[...]»我不明白这两个选项是如何连接的,以及为什么没有另一个选项是不可能的.可以在此处找到关于连接的另一个提及.有人可以解释为什么这两个选项是连接的吗?

python celery celeryd

67
推荐指数
4
解决办法
1万
查看次数

Celery并行分布式任务与多处理

我有一个CPU密集型Celery任务.我想在许多EC2实例中使用所有处理能力(核心)来更快地完成这项工作(我认为芹菜并行分布式多任务处理任务).

术语,线程,多处理,分布式计算,分布式并行处理都是我试图更好理解的术语.

示例任务:

  @app.task
  for item in list_of_millions_of_ids:
      id = item # do some long complicated equation here very CPU heavy!!!!!!! 
      database.objects(newid=id).save()
Run Code Online (Sandbox Code Playgroud)

使用上面的代码(如果可能的话,有一个例子)如何使用Celery分配这个任务,允许利用云中所有可用机器的所有计算CPU功率来分离这一任务?

python django multithreading multiprocessing celery

66
推荐指数
3
解决办法
3万
查看次数

为什么使用Celery而不是RabbitMQ?

根据我的理解,Celery是一个分布式任务队列,这意味着它应该做的唯一事情是将任务/作业分派给其他服务器并获得结果.RabbitMQ是一个消息队列,仅此而已.但是,工作人员只能在收到消息时监听MQ并执行任务.这完全符合Celery的要求,为什么需要Celery呢?

python message-queue rabbitmq celery

65
推荐指数
2
解决办法
2万
查看次数

以指数退避重试Celery任务

对于这样的任务:

from celery.decorators import task

@task()
def add(x, y):
    if not x or not y:
        raise Exception("test error")
    return self.wait_until_server_responds(
Run Code Online (Sandbox Code Playgroud)

如果它抛出异常并且我想从守护进程端重试它,那么如何应用指数退避算法,即在等待2^2, 2^3,2^4秒之后?

也是从服务器端维护的重试,这样如果工作人员碰巧被杀死,那么产生的下一个工作人员将进行重试任务?

python celery django-celery

65
推荐指数
2
解决办法
2万
查看次数

将所有celery任务的日志消息发送到单个文件

我想知道如何设置更具体的日志系统.我的所有任务都使用

logger = logging.getLogger(__name__)
Run Code Online (Sandbox Code Playgroud)

作为模块范围的记录器.

我希望celery登录到"celeryd.log"并将我的任务记录到"tasks.log",但我不知道如何让它工作.使用CELERYD_LOG_FILE来自django-celery我可以将所有与celeryd相关的日志消息路由到celeryd.log,但是没有在我的任务中创建的日志消息的跟踪.

python logging celery

64
推荐指数
2
解决办法
5万
查看次数

Django Celery采伐最佳实践

我正在努力让Celery登录工作Django.我有登录设置settings.py进入控制台(这是我正在托管的工作正常Heroku).在每个模块的顶部,我有:

import logging
logger = logging.getLogger(__name__)
Run Code Online (Sandbox Code Playgroud)

在我的tasks.py中,我有:

from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
Run Code Online (Sandbox Code Playgroud)

这适用于从任务记录调用,我得到如下输出:

2012-11-13T18:05:38+00:00 app[worker.1]: [2012-11-13 18:05:38,527: INFO/PoolWorker-2] Syc feed is starting
Run Code Online (Sandbox Code Playgroud)

但是如果该任务然后调用另一个模块中的queryset方法,例如方法,我会得到重复的日志条目,例如

2012-11-13T18:00:51+00:00 app[worker.1]: [INFO] utils.generic_importers.ftp_processor process(): File xxx.csv already imported. Not downloaded
2012-11-13T18:00:51+00:00 app[worker.1]: [2012-11-13 18:00:51,736: INFO/PoolWorker-6] File xxx.csv already imported. Not downloaded
Run Code Online (Sandbox Code Playgroud)

我想我可以用

CELERY_HIJACK_ROOT_LOGGER = False
Run Code Online (Sandbox Code Playgroud)

只是使用Django日志记录但是当我尝试它时这不起作用,即使我确实让它工作,我也会失去"PoolWorker-6"我想要的那一点.(顺便说一句,我无法弄清楚如何在Celery的日志条目中显示任务名称,因为文档似乎表明它应该).

我怀疑我在这里缺少一些简单的东西.

python django logging celery django-celery

64
推荐指数
4
解决办法
4万
查看次数

芹菜与RabbitMQ:AttributeError:'DisabledBackend'对象没有属性'_get_task_meta_for'

我正在使用Celery Tutorial运行第一步.

我们定义以下任务:

from celery import Celery

app = Celery('tasks', broker='amqp://guest@localhost//')

@app.task
def add(x, y):
    return x + y
Run Code Online (Sandbox Code Playgroud)

然后叫它:

>>> from tasks import add
>>> add.delay(4, 4)
Run Code Online (Sandbox Code Playgroud)

但是我收到以下错误:

AttributeError: 'DisabledBackend' object has no attribute '_get_task_meta_for'
Run Code Online (Sandbox Code Playgroud)

我正在运行芹菜工作者和兔子mq服务器.相当奇怪的是,芹菜工作者将任务报告为成功:

[2014-04-22 19:12:03,608: INFO/MainProcess] Task test_celery.add[168c7d96-e41a-41c9-80f5-50b24dcaff73] succeeded in 0.000435483998444s: 19 
Run Code Online (Sandbox Code Playgroud)

为什么这不起作用?

python celery

60
推荐指数
4
解决办法
2万
查看次数