标签: celery-task

芹菜 - 在链子里面的小组

我想在链中使用一个组(或块),例如:

chain(getRange.s(3),  GROUP() , xsum.s() )
Run Code Online (Sandbox Code Playgroud)

GROUP()一组double()任务在哪里,即group(double(0),double(1),double(2)).在如何链接一个将列表返回到组中的Celery任务中发布了类似的问题但是没有解释如何将组中的输出传递给链中的下一个任务.

@task
def getRange(x):
    return range(x)

@task
def double(nr):
    return nr*2

@task
def xsum(list):
    return sum(list)
Run Code Online (Sandbox Code Playgroud)

python celery celery-task

7
推荐指数
1
解决办法
3772
查看次数

在芹菜中,如何确保在工人崩溃时重试任务

首先,请不要考虑这个问题作为一个重复 这个问题

我有一个设置环境,使用celeryredisas brokerresult_backend.我的问题是如何确保当芹菜工人崩溃时,所有计划的任务都会重新尝试,当芹菜工人回来时.

我已经看到了使用的建议CELERY_ACKS_LATE = True,以便代理将重新驱动任务,直到它得到一个ACK,但在我的情况下它不起作用.每当我安排一项任务时,它会立即发送到工作人员,该工作人员将其持续到预定的执行时间.让我举个例子:

我正在安排这样的任务:res=test_task.apply_async(countdown=600),但是在芹菜工人日志中,我可以看到类似的东西:Got task from broker: test_task[a137c44e-b08e-4569-8677-f84070873fc0] eta:[2013-01-...].现在,当我杀死芹菜工人时,这些预定的任务就会丢失.我的设置:

BROKER_URL = "redis://localhost:6379/0"  
CELERY_ALWAYS_EAGER = False  
CELERY_RESULT_BACKEND = "redis://localhost:6379/0"  
CELERY_ACKS_LATE = True
Run Code Online (Sandbox Code Playgroud)

scheduled-tasks redis celery celery-task django-celery

7
推荐指数
1
解决办法
3783
查看次数

在数据库中存储每个芹菜任务的任务ID

我用过芹菜MySQL.我想将任务id存储为数据库或celery变量中的普通整数task.我怎样才能做到这一点?

django celery celery-task django-celery

7
推荐指数
1
解决办法
5785
查看次数

Celery任务不会在Django测试中抛出异常

我有几个芹菜任务,包含在我的Django测试中.不幸的是,当通过.delay()调用任务时,不会抛出异常.我将CELERY_ALWAYS_EAGER设置为True.

tasks.py

import celeryapp as app

@app.task()
def exception_task():
    print 'CELERY_ALWAYS_EAGER:', app.conf['CELERY_ALWAYS_EAGER']
    raise Exception('foo')
Run Code Online (Sandbox Code Playgroud)

tests.py

def test_exception_in_task(self):
        from tasks import exception_task
        exception_task.delay()
Run Code Online (Sandbox Code Playgroud)

产量

CELERY_ALWAYS_EAGER: True
.
----------------------------------------------------------------------
Ran 1 test in 0.686s
Run Code Online (Sandbox Code Playgroud)

删除.delay时,测试将退出并出现错误:

ERROR: test_exception_in_task
Exception: foo
Run Code Online (Sandbox Code Playgroud)

版本

celery==3.1.4
Django==1.6.4
Run Code Online (Sandbox Code Playgroud)

python testing django celery celery-task

7
推荐指数
2
解决办法
680
查看次数

芹菜如何运作?

我最近开始致力于分布式计算,以提高计算速度.我选择了芹菜.但是,我对某些术语不太熟悉.所以,我有几个相关的问题.

来自Celery文档:

什么是任务队列?

...

Celery通过消息进行通信,通常使用经纪人在客户和工人之间进行调解.为了启动任务,客户端向队列添加消息,然后代理将该消息传递给工作者.

什么是客户(这里)?什么是经纪人?为什么通过经纪人传递消息?为什么Celery会使用后端和队列进行进程间通信?

当我通过发出命令执行Celery控制台时

celery worker -A tasks --loglevel=info --concurrency 5
Run Code Online (Sandbox Code Playgroud)

这是否意味着Celery控制台是一个工作进程,它负责5个不同的进程并跟踪任务队列?当新任务被推入任务队列时,此工作人员是否将任务/作业分配给5个进程中的任何进程?

worker multiprocessing celery broker celery-task

7
推荐指数
1
解决办法
1559
查看次数

我如何在芹菜中指定SQS队列名称

我需要更换我redis与经纪人SQS的经纪人,而googleing它,我碰到很多页面,告诉如何使用SQScelery.根据我的理解,它创建自己的SQS队列,我只有一个任务,并希望使用已经创建的SQS队列.

python asynchronous amazon-sqs celery celery-task

7
推荐指数
2
解决办法
1967
查看次数

在 celery 中,当任务排队时,将上下文元数据从发送方进程传递给工作人员的适当方法是什么?

当任何芹菜任务排队时,我想添加工作人员将能够使用的上下文元数据。

以下代码示例有效,但我想要一个合适的芹菜式解决方案。

from celery.signals import before_task_publish, task_prerun

@before_task_publish.connect
def receiver_before_task_publish(sender=None, headers=None, body=None, **kwargs):
    task_kwags = body[1]
    metadata = {"foo": "bar"}
    task_kwags['__metadata__'] = metadata

@task_prerun.connect
def receiver_task_pre_run(task_id, task, *args, **kwargs):
    metadata = kwargs['kwargs'].pop('__metadata__', {})
    # metadata == {"foo": "bar"}
Run Code Online (Sandbox Code Playgroud)

celery celery-task django-celery

7
推荐指数
1
解决办法
1878
查看次数

Celery - 如何通过任务 id 获取任务名称?

Celery - 底线:我想通过使用任务 id 来获取任务名称(我没有任务对象)

假设我有这个代码:

res = chain(add.s(4,5), add.s(10)).delay()
cache.save_task_id(res.task_id)
Run Code Online (Sandbox Code Playgroud)

然后在其他地方:

task_id = cache.get_task_ids()[0]
task_name = get_task_name_by_id(task_id) #how?
print(f'Some information about the task status of: {task_name}')
Run Code Online (Sandbox Code Playgroud)

我知道如果我有一个任务对象,我可以得到任务名称,比如这里:celery: get function name by task id? . 但是我没有任务对象(也许它可以由 task_id 或其他方式创建?我在文档中没有看到任何与此相关的内容)。

另外,我不想在缓存中保存任务名称。(假设我有一个很长的链/其他 celery 原语,我不想保存它们的所有名称/task_ids。仅最后一个 task_id 应该足以获取有关所有任务的所有信息,使用 .parents 等)

我查看了 AsyncResult 和 AsyncResult.Backend 对象的所有相关方法。唯一似乎相关的是 backend.get_task_meta(task_id),但它不包含任务名称。提前致谢

PS:AsyncResult.name 总是返回 None:

result = AsyncResult(task_id, app=celery_app)
result.name #Returns None
result.args #Also returns None
Run Code Online (Sandbox Code Playgroud)

python celery celery-task

7
推荐指数
1
解决办法
1663
查看次数

如果使用相同参数创建新任务,Celery 会撤销旧任务

我无法理解 Celery 队列和撤销。我的主要问题是,如果创建了具有相同名称和参数的新任务,如何撤销(停止执行)队列中已有的任务。所以在伪代码中

new_task_to_be_added = some_task(1,2)
if exists some_task in queue where some_task.args = (1,2):
    remove such task from queue
new_task_to_be_added.add_to_queue()
Run Code Online (Sandbox Code Playgroud)

这与密码重置机制相关 - 用户单击“创建临时密码”,并有 5 分钟的时间使用此密码创建新的永久密码。如果此时未设置新密码,则帐户应被锁定。但是,如果在 4 分钟后用户创建了一个新的临时密码,他应该再次有 5 分钟的时间来更改它,但“锁定帐户”任务将在 1 分钟内触发 - 我想停止它并始终只使用最新的任务(使用相同参数)

这不是以下内容的副本: Revoke celerytaskswithsameargs/kwargs 这实际上是相反的问题

django scheduled-tasks celery celery-task django-celery

7
推荐指数
0
解决办法
311
查看次数

python celery -A 的无效值无法加载应用程序

我有一个以下项目目录:

azima:
    __init.py
    main.py
    tasks.py
Run Code Online (Sandbox Code Playgroud)

任务.py

from .main import app

@app.task
def add(x, y):
    return x + y

@app.task
def mul(x, y):
    return x * y

@app.task
def xsum(numbers):
    return sum(numbers)
Run Code Online (Sandbox Code Playgroud)

主要.py

from celery import Celery

app = Celery('azima', backend='redis://localhost:6379/0', broker='redis://localhost:6379/0', include=['azima.tasks'])

# Optional configuration, see the application user guide.
app.conf.update(
    result_expires=3600,
)

if __name__ == '__main__':
    app.start()
Run Code Online (Sandbox Code Playgroud)

当我运行命令来初始化 celery 工作人员时,出现以下错误:

(azima_venv) brightseid@darkseid:~$ celery -A azima worker -l INFO
Usage: celery [OPTIONS] COMMAND [ARGS]...

Error: Invalid value for '-A' / …
Run Code Online (Sandbox Code Playgroud)

python celery celery-task

7
推荐指数
1
解决办法
2万
查看次数