如何从任务中获取任务的task_id值?这是我的代码:
from celery.decorators import task
from django.core.cache import cache
@task
def do_job(path):
"Performs an operation on a file"
# ... Code to perform the operation ...
cache.set(current_task_id, operation_results)
Run Code Online (Sandbox Code Playgroud)
我的想法是,当我创建任务的新实例时,我task_id从任务对象中检索它.然后我使用任务ID来确定任务是否已完成.我不想通过path值跟踪任务,因为文件在任务完成后被"清理",并且可能存在也可能不存在.
在上面的例子中,我将如何获得值current_task_id?
我正在使用芹菜和django芹菜.我已经定义了一个我想测试的周期性任务.是否可以手动从shell运行周期性任务,以便查看控制台输出?
在工作中,我们需要构建一个作业服务器,用于发送电子邮件,构建PDF,处理一些数据等.显然,我们希望建立一些通用的排队系统.我熟悉Gearman,这是它试图解决的确切问题:将工作放在工作人员来接他们的队列中.但是,我看到很多关于Rabbitmq的提及,我不清楚它在这种情况下是如何使用的.
Rabbitmq是构建分布式作业系统的好框架吗?
我刚刚发现了配置选项CELERYD_PREFETCH_MULTIPLIER(docs).默认值为4,但(我相信)我希望预取或尽可能低.我现在把它设置为1,这足够接近我正在寻找的东西,但仍有一些我不明白的东西:
为什么这个预取是一个好主意?除非消息队列和工作人员之间存在大量延迟(在我的情况下,他们当前在同一主机上运行,最糟糕的情况可能最终在同一数据中的不同主机上运行),我真的没有理由看到它.中央).文档仅提到了缺点,但未能解释其优点.
许多人似乎把它设置为0,期望能够以这种方式关闭预取(在我看来是一个合理的假设).但是,0表示无限预取.为什么有人会想要无限制的预取,这不是完全消除了你首先引入任务队列的并发/异步性吗?
为什么不能关闭预取?在大多数情况下,关闭性能可能不是一个好主意,但有技术上的原因是不可能的吗?还是只是没有实施?
有时,此选项已连接到CELERY_ACKS_LATE.例如.Roger Hu写道 «[...]通常[用户]真正想要的是让工人只保留与子进程一样多的任务.但是,如果没有启用延迟确认,这是不可能的[...]»我不明白这两个选项是如何连接的,以及为什么没有另一个选项是不可能的.可以在此处找到关于连接的另一个提及.有人可以解释为什么这两个选项是连接的吗?
我有一个CPU密集型Celery任务.我想在许多EC2实例中使用所有处理能力(核心)来更快地完成这项工作(我认为芹菜并行分布式多任务处理任务).
术语,线程,多处理,分布式计算,分布式并行处理都是我试图更好理解的术语.
示例任务:
@app.task
for item in list_of_millions_of_ids:
id = item # do some long complicated equation here very CPU heavy!!!!!!!
database.objects(newid=id).save()
Run Code Online (Sandbox Code Playgroud)
使用上面的代码(如果可能的话,有一个例子)如何使用Celery分配这个任务,允许利用云中所有可用机器的所有计算CPU功率来分离这一任务?
根据我的理解,Celery是一个分布式任务队列,这意味着它应该做的唯一事情是将任务/作业分派给其他服务器并获得结果.RabbitMQ是一个消息队列,仅此而已.但是,工作人员只能在收到消息时监听MQ并执行任务.这完全符合Celery的要求,为什么需要Celery呢?
对于这样的任务:
from celery.decorators import task
@task()
def add(x, y):
if not x or not y:
raise Exception("test error")
return self.wait_until_server_responds(
Run Code Online (Sandbox Code Playgroud)
如果它抛出异常并且我想从守护进程端重试它,那么如何应用指数退避算法,即在等待2^2, 2^3,2^4秒之后?
也是从服务器端维护的重试,这样如果工作人员碰巧被杀死,那么产生的下一个工作人员将进行重试任务?
我想知道如何设置更具体的日志系统.我的所有任务都使用
logger = logging.getLogger(__name__)
Run Code Online (Sandbox Code Playgroud)
作为模块范围的记录器.
我希望celery登录到"celeryd.log"并将我的任务记录到"tasks.log",但我不知道如何让它工作.使用CELERYD_LOG_FILE来自django-celery我可以将所有与celeryd相关的日志消息路由到celeryd.log,但是没有在我的任务中创建的日志消息的跟踪.
我正在努力让Celery登录工作Django.我有登录设置settings.py进入控制台(这是我正在托管的工作正常Heroku).在每个模块的顶部,我有:
import logging
logger = logging.getLogger(__name__)
Run Code Online (Sandbox Code Playgroud)
在我的tasks.py中,我有:
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
Run Code Online (Sandbox Code Playgroud)
这适用于从任务记录调用,我得到如下输出:
2012-11-13T18:05:38+00:00 app[worker.1]: [2012-11-13 18:05:38,527: INFO/PoolWorker-2] Syc feed is starting
Run Code Online (Sandbox Code Playgroud)
但是如果该任务然后调用另一个模块中的queryset方法,例如方法,我会得到重复的日志条目,例如
2012-11-13T18:00:51+00:00 app[worker.1]: [INFO] utils.generic_importers.ftp_processor process(): File xxx.csv already imported. Not downloaded
2012-11-13T18:00:51+00:00 app[worker.1]: [2012-11-13 18:00:51,736: INFO/PoolWorker-6] File xxx.csv already imported. Not downloaded
Run Code Online (Sandbox Code Playgroud)
我想我可以用
CELERY_HIJACK_ROOT_LOGGER = False
Run Code Online (Sandbox Code Playgroud)
只是使用Django日志记录但是当我尝试它时这不起作用,即使我确实让它工作,我也会失去"PoolWorker-6"我想要的那一点.(顺便说一句,我无法弄清楚如何在Celery的日志条目中显示任务名称,因为文档似乎表明它应该).
我怀疑我在这里缺少一些简单的东西.
我正在使用Celery Tutorial运行第一步.
我们定义以下任务:
from celery import Celery
app = Celery('tasks', broker='amqp://guest@localhost//')
@app.task
def add(x, y):
return x + y
Run Code Online (Sandbox Code Playgroud)
然后叫它:
>>> from tasks import add
>>> add.delay(4, 4)
Run Code Online (Sandbox Code Playgroud)
但是我收到以下错误:
AttributeError: 'DisabledBackend' object has no attribute '_get_task_meta_for'
Run Code Online (Sandbox Code Playgroud)
我正在运行芹菜工作者和兔子mq服务器.相当奇怪的是,芹菜工作者将任务报告为成功:
[2014-04-22 19:12:03,608: INFO/MainProcess] Task test_celery.add[168c7d96-e41a-41c9-80f5-50b24dcaff73] succeeded in 0.000435483998444s: 19
Run Code Online (Sandbox Code Playgroud)
为什么这不起作用?