标签: celery-task

如何使用 celery 获取任务的执行时间?

我想获取任何任务的执行时间,假设我有task_id。有没有办法获取执行时间?我知道时间将由 celery 记录,但我不想解析日志文件。类似地,有没有办法获取任务的等待时间,换句话说,从任务被调度到工作节点的时间到它开始执行的时间。基本上我想捕获任何任务的整个生命周期。

python celery celery-task

5
推荐指数
0
解决办法
1264
查看次数

如何覆盖 celery 任务的后端

我们使用 Redis 作为结果后端。然而,对于一项任务,我们希望重写它以使用 RabbitMQ。

Task.backend的文档说:

用于此任务的结果存储后端。默认为 CELERY_RESULT_BACKEND 设置

所以我假设我们可以设置Task.backendCELERY_RESULT_BACKEND.

所以我尝试这个:

celeryconfig.py

CELERY_RESULT_BACKEND = "redis://redis-host:7777"
Run Code Online (Sandbox Code Playgroud)

tasks.py

@app.task(backend='amqp://guest@localhost/tasks-stg')
def my_task(params):
    ...
Run Code Online (Sandbox Code Playgroud)

然而,工人失败了:

[2015-05-07 13:33:49,264: ERROR/Worker-1] Process Worker-1
Traceback (most recent call last):
  File "/project/python2.7_x64/lib/python2.7/site-packages/billiard/process.py", line 292, in _bootstrap
    self.run()
  File "/project/python2.7_x64/lib/python2.7/site-packages/billiard/pool.py", line 286, in run
    self.after_fork()
  File "/project/python2.7_x64/lib/python2.7/site-packages/billiard/pool.py", line 389, in after_fork
    self.initializer(*self.initargs)
  File "/project/python2.7_x64/lib/python2.7/site-packages/celery/concurrency/prefork.py", line 81, in process_initializer
    app=app)
  File "/project/python2.7_x64/lib/python2.7/site-packages/celery/app/trace.py", line 178, in build_tracer
    store_result = backend.store_result
AttributeError: 'str' object has no attribute 'store_result'
Run Code Online (Sandbox Code Playgroud)

python celery celery-task

5
推荐指数
1
解决办法
4626
查看次数

获取celery组任务抛出的异常对象

我有一个团体对象。我不想等到所有任务完成并打印结果。这是我的代码:

tasks = [my_task.s(some_val=val).set(queue='different_q') for val in val_list]
job = group(tasks)
job_result = job.apply_async()
results = job_result.get()
Run Code Online (Sandbox Code Playgroud)

如果一切按预期进行,那么我得到的结果

[val1, val2, val3]
Run Code Online (Sandbox Code Playgroud)

但是,如果其中一个任务抛出异常,则该任务job_result.get()也会抛出异常。我知道chord我可以传播异常,但我找不到正确的方法来传播异常group
这是想要的结果:

[val1, Exception('some exception'), val3]
Run Code Online (Sandbox Code Playgroud)

并且job_result.get()不会抛出异常。

exception celery celery-task

5
推荐指数
0
解决办法
222
查看次数

另一个函数内部的 Flask Celery update_state

我想从另一个函数更新我的 Celery 任务的状态。这是我现在所拥有的:

路线

@app.route('/my-long-function', methods=['POST'])
def my_long_function():

    param1 = request.form['param1']
    param2 = request.form['param2']

    task = outside_function.delay(param1, param2)

    return task.id
Run Code Online (Sandbox Code Playgroud)

Celery Task - 在后台启动 some_python_script.handle

@celery.task(name='outside_function')
def outside_function(param1, param2):
    with app.app_context():
        some_python_script.handle(param1, param2)
Run Code Online (Sandbox Code Playgroud)

some_python_script.handle:

def handle(param1, param2):
    param1 + param2
    # many, many different things
Run Code Online (Sandbox Code Playgroud)

理想情况下,我希望能够 self.update_state celery 任务,以便我可以轻松地从我的应用程序请求其状态,如下所示:

some_python_script.handle(理想情况下):

def handle(param1, param2):
    param1 + param2
    # many, many different things
    self.outside_function.update_state('PROGRESS', meta = {'status':'progressing'})
Run Code Online (Sandbox Code Playgroud)

检查进度(理想情况下):

@app.route('/status/<task_id>')
def taskstatus(task_id):
    task = outside_function.AsyncResult(task_id)
    response = {
    'state': task.state,
    'id': task.id,
    'status' …
Run Code Online (Sandbox Code Playgroud)

celery flask celery-task

5
推荐指数
1
解决办法
1280
查看次数

Celery - 批量队列任务

我有一些代码将大量(1000s)芹菜任务排队,例如,假设是这样的:

for x in xrange(2000):
    example_task.delay(x)
Run Code Online (Sandbox Code Playgroud)

有没有更好/更有效的方法来一次排队大量任务?他们都有不同的论点。

python celery celery-task

5
推荐指数
2
解决办法
4398
查看次数

Celery 4.0.2 AsyncResult.then 不工作

我想采用文档中所示的承诺协议。那里提供的示例有效,但承诺是在工作人员方面处理的。相反,我想在客户端得到通知。

这是我的test.py:

from celery import Celery

app = Celery(broker='amqp://', backend='rpc')

@app.task
def add(x, y):
    return x + y
Run Code Online (Sandbox Code Playgroud)

在客户端,我输入以下命令:

import test
test.add.delay(2, 2).then(lambda: print('OK'))
Run Code Online (Sandbox Code Playgroud)

在谷歌搜索时,我遇到了这个问题,所以显然我不是唯一一个努力理解它应该如何工作的人。

我的理解是,一旦任务被处理,结果应该被发送回客户端,然后回调应该被触发,但事实并非如此,我的承诺永远不会得到解决。

我的理解正确吗?这是理想的行为吗?

谢谢

python asynchronous celery promise celery-task

5
推荐指数
1
解决办法
739
查看次数

在多个应用程序中使用 Celery 队列

你如何为多个应用程序使用同名的 Celery 队列?

我有一个包含 N 个客户端数据库的应用程序,这些数据库都需要在特定队列 M 上处理 Celery 任务。

对于每个客户端数据库,我有一个单独的 celery worker,我启动如下:

celery worker -A client1 -n client1@%h -P solo -Q long
celery worker -A client2 -n client2@%h -P solo -Q long
celery worker -A client3 -n client3@%h -P solo -Q long
Run Code Online (Sandbox Code Playgroud)

当我同时运行所有工作程序并尝试向 client1 启动任务时,我发现它似乎从未执行过。然后我杀死了除第一个之外的所有工人,现在第一个工人接收并执行任务。事实证明,即使每个工人的应用程序使用不同的 BROKER_URL,使用相同的队列也会导致他们窃取彼此的任务。

这让我感到惊讶,因为如果我不指定-Q,这意味着 Celery 从“默认”队列中提取,这不会发生。

如何使用我的自定义队列防止这种情况?是在队列名称中包含客户端 ID 的唯一解决方案吗?还是有更“合适”的解决方案?

celery celery-task django-celery

5
推荐指数
1
解决办法
2197
查看次数

Flask Celery 任务锁定

我正在将 Flask 与 Celery 一起使用,并且我正在尝试锁定特定任务,以便一次只能运行一个任务。在 celery 文档中,它给出了执行此Celery 文档的示例,确保一次只执行一个任务。给出的这个例子是针对 Django 的,但是我正在使用 Flask 我已尽力将其转换为与 Flask 一起使用,但是我仍然看到具有锁的 myTask1 可以多次运行。

我不清楚的一件事是我是否正确使用缓存,我以前从未使用过它,所以所有这些对我来说都是新的。提到但未解释的文档中的一件事是

文档注释:

In order for this to work correctly you need to be using a cache backend where the .add operation is atomic. memcached is known to work well for this purpose.

我不确定这意味着什么,我应该将缓存与数据库结合使用,如果是这样,我该怎么做?我正在使用 mongodb。在我的代码中,我只是为缓存设置了这个设置,cache = Cache(app, config={'CACHE_TYPE': 'simple'})因为这就是 Flask-Cache 文档的Flask-Cache Docs 中提到的

我不清楚的另一件事是,当我myTask1从 Flask 路线内打电话给我时,我是否需要做任何不同的事情task1

这是我正在使用的代码示例。

from flask import (Flask, render_template, flash, redirect, …
Run Code Online (Sandbox Code Playgroud)

python celery flask celery-task flask-cache

5
推荐指数
2
解决办法
3676
查看次数

Celery: it ignores the exchange name when sending a task

I have a very simple code:

celery = Celery(broker="amqp://guest:guest@172.17.0.1:5672/")                                                                                                                                                                                                           
celery.send_task(                                                                                                                                                                                                                            
    "robot.worker",                                                                                                                                                                                                                
    kwargs={},                                                                                                                                                                                                                               
    exchange="I_am_useless", # with  exchange=Exchange("I_am_useless") I got the same results                                                                                                                                                                                                          
)                                                                                                                                                                                                                                            
Run Code Online (Sandbox Code Playgroud)

我真的需要将任务发送到交换“I_am_useless”,但是它没有发送到那里,当我在 AMQP 协议级别调试时,我看到发送了一个发布事件(可以)但具有以下特征

reserved-1': 0,
'exchange-name': '',
'routing-key': 'celery',
'mandatory': False,
'immediate': False
Run Code Online (Sandbox Code Playgroud)

因此似乎该参数完全被忽略,因为即使在交换声明事件期间,也会使用名称“celery”。但根据文档send_task采用与https://docs.celeryproject.org/en/latest/reference/celery.app.task.html#celery.app.task.Task.apply_async相同的参数,我们可以看到有一个应接受字符串的“交换”参数。

但是如果我直接打电话给kombu

rabbit_url = "amqp://guest:guest@172.17.0.1:5672/"                                                                                                                                                                                           
conn = Connection(rabbit_url)                                                                                                                                                                                                               
channel = conn.channel()                                                                                                                                                                                                                    
exchange = Exchange("example-exchange", type="direct")                                                                                                                                                                                      
producer = Producer(exchange=exchange, channel=channel, routing_key="BOB")                                                                                                                                                                  
queue = Queue(name="example-queue", exchange=exchange, routing_key="BOB")                                                                                                                                                                   
queue.maybe_bind(conn)                                                                                                                                                                                                                      
queue.declare()                                                                                                                                                                                                                             
producer.publish("Hello there!")                                                                                                                                                                                                            

Run Code Online (Sandbox Code Playgroud)

我正确地看到了交易所名称,所以我想知道我做错了什么?

python celery celery-task kombu

5
推荐指数
1
解决办法
230
查看次数

如何从 django 模板暂停和停止 celery 任务

我正在开发一个使用 celery 异步运行任务的 django 应用程序。现在,用户可以从网页提交表单以启动 celery 任务。但是无法通过单击 django 模板中的按钮来暂停或停止任务。

到目前为止,这是我的代码

芹菜任务

@shared_task
def get_website(website):
    website_list = return_website_list(website)

    return website_list
Run Code Online (Sandbox Code Playgroud)

在上面的任务中,我调用了一个return_website_list()函数来抓取所需的网站并返回该网站的链接列表。

output.html 模板

<div class="container">
    <button class="pause_btn" type="button">Pause task</button>
    <button class="resume_btn" type="button">Resume task</button>
    <button class="stop_btn" type="button">Stop task</button>
</div>
Run Code Online (Sandbox Code Playgroud)

我希望能够在单击暂停按钮时无限期暂停任务并在单击恢复按钮时恢复任务或在单击停止按钮时完全停止任务的能力。

视图.py

def index(request):

    if request.method == 'POST':
        website = request.POST.get('website-name')
   
        get_website.delay(website)
        return redirect('output')

    return render(request, 'index.html')
Run Code Online (Sandbox Code Playgroud)

我在网上搜索,像这些link1link2link3。但是这些链接并没有帮助我实现我想要做的事情。

提前致谢

python django celery celery-task

5
推荐指数
1
解决办法
465
查看次数