我想获取任何任务的执行时间,假设我有task_id。有没有办法获取执行时间?我知道时间将由 celery 记录,但我不想解析日志文件。类似地,有没有办法获取任务的等待时间,换句话说,从任务被调度到工作节点的时间到它开始执行的时间。基本上我想捕获任何任务的整个生命周期。
我们使用 Redis 作为结果后端。然而,对于一项任务,我们希望重写它以使用 RabbitMQ。
Task.backend的文档说:
用于此任务的结果存储后端。默认为 CELERY_RESULT_BACKEND 设置
所以我假设我们可以设置Task.backend为CELERY_RESULT_BACKEND.
所以我尝试这个:
celeryconfig.py
CELERY_RESULT_BACKEND = "redis://redis-host:7777"
Run Code Online (Sandbox Code Playgroud)
tasks.py
@app.task(backend='amqp://guest@localhost/tasks-stg')
def my_task(params):
...
Run Code Online (Sandbox Code Playgroud)
然而,工人失败了:
[2015-05-07 13:33:49,264: ERROR/Worker-1] Process Worker-1
Traceback (most recent call last):
File "/project/python2.7_x64/lib/python2.7/site-packages/billiard/process.py", line 292, in _bootstrap
self.run()
File "/project/python2.7_x64/lib/python2.7/site-packages/billiard/pool.py", line 286, in run
self.after_fork()
File "/project/python2.7_x64/lib/python2.7/site-packages/billiard/pool.py", line 389, in after_fork
self.initializer(*self.initargs)
File "/project/python2.7_x64/lib/python2.7/site-packages/celery/concurrency/prefork.py", line 81, in process_initializer
app=app)
File "/project/python2.7_x64/lib/python2.7/site-packages/celery/app/trace.py", line 178, in build_tracer
store_result = backend.store_result
AttributeError: 'str' object has no attribute 'store_result'
Run Code Online (Sandbox Code Playgroud) 我有一个团体对象。我不想等到所有任务完成并打印结果。这是我的代码:
tasks = [my_task.s(some_val=val).set(queue='different_q') for val in val_list]
job = group(tasks)
job_result = job.apply_async()
results = job_result.get()
Run Code Online (Sandbox Code Playgroud)
如果一切按预期进行,那么我得到的结果
[val1, val2, val3]
Run Code Online (Sandbox Code Playgroud)
但是,如果其中一个任务抛出异常,则该任务job_result.get()也会抛出异常。我知道chord我可以传播异常,但我找不到正确的方法来传播异常group。
这是想要的结果:
[val1, Exception('some exception'), val3]
Run Code Online (Sandbox Code Playgroud)
并且job_result.get()不会抛出异常。
我想从另一个函数更新我的 Celery 任务的状态。这是我现在所拥有的:
@app.route('/my-long-function', methods=['POST'])
def my_long_function():
param1 = request.form['param1']
param2 = request.form['param2']
task = outside_function.delay(param1, param2)
return task.id
Run Code Online (Sandbox Code Playgroud)
@celery.task(name='outside_function')
def outside_function(param1, param2):
with app.app_context():
some_python_script.handle(param1, param2)
Run Code Online (Sandbox Code Playgroud)
def handle(param1, param2):
param1 + param2
# many, many different things
Run Code Online (Sandbox Code Playgroud)
理想情况下,我希望能够 self.update_state celery 任务,以便我可以轻松地从我的应用程序请求其状态,如下所示:
def handle(param1, param2):
param1 + param2
# many, many different things
self.outside_function.update_state('PROGRESS', meta = {'status':'progressing'})
Run Code Online (Sandbox Code Playgroud)
@app.route('/status/<task_id>')
def taskstatus(task_id):
task = outside_function.AsyncResult(task_id)
response = {
'state': task.state,
'id': task.id,
'status' …Run Code Online (Sandbox Code Playgroud) 我有一些代码将大量(1000s)芹菜任务排队,例如,假设是这样的:
for x in xrange(2000):
example_task.delay(x)
Run Code Online (Sandbox Code Playgroud)
有没有更好/更有效的方法来一次排队大量任务?他们都有不同的论点。
我想采用文档中所示的承诺协议。那里提供的示例有效,但承诺是在工作人员方面处理的。相反,我想在客户端得到通知。
这是我的test.py:
from celery import Celery
app = Celery(broker='amqp://', backend='rpc')
@app.task
def add(x, y):
return x + y
Run Code Online (Sandbox Code Playgroud)
在客户端,我输入以下命令:
import test
test.add.delay(2, 2).then(lambda: print('OK'))
Run Code Online (Sandbox Code Playgroud)
在谷歌搜索时,我遇到了这个问题,所以显然我不是唯一一个努力理解它应该如何工作的人。
我的理解是,一旦任务被处理,结果应该被发送回客户端,然后回调应该被触发,但事实并非如此,我的承诺永远不会得到解决。
我的理解正确吗?这是理想的行为吗?
谢谢
你如何为多个应用程序使用同名的 Celery 队列?
我有一个包含 N 个客户端数据库的应用程序,这些数据库都需要在特定队列 M 上处理 Celery 任务。
对于每个客户端数据库,我有一个单独的 celery worker,我启动如下:
celery worker -A client1 -n client1@%h -P solo -Q long
celery worker -A client2 -n client2@%h -P solo -Q long
celery worker -A client3 -n client3@%h -P solo -Q long
Run Code Online (Sandbox Code Playgroud)
当我同时运行所有工作程序并尝试向 client1 启动任务时,我发现它似乎从未执行过。然后我杀死了除第一个之外的所有工人,现在第一个工人接收并执行任务。事实证明,即使每个工人的应用程序使用不同的 BROKER_URL,使用相同的队列也会导致他们窃取彼此的任务。
这让我感到惊讶,因为如果我不指定-Q,这意味着 Celery 从“默认”队列中提取,这不会发生。
如何使用我的自定义队列防止这种情况?是在队列名称中包含客户端 ID 的唯一解决方案吗?还是有更“合适”的解决方案?
我正在将 Flask 与 Celery 一起使用,并且我正在尝试锁定特定任务,以便一次只能运行一个任务。在 celery 文档中,它给出了执行此Celery 文档的示例,确保一次只执行一个任务。给出的这个例子是针对 Django 的,但是我正在使用 Flask 我已尽力将其转换为与 Flask 一起使用,但是我仍然看到具有锁的 myTask1 可以多次运行。
我不清楚的一件事是我是否正确使用缓存,我以前从未使用过它,所以所有这些对我来说都是新的。提到但未解释的文档中的一件事是
In order for this to work correctly you need to be using a cache backend where the .add operation is atomic. memcached is known to work well for this purpose.
我不确定这意味着什么,我应该将缓存与数据库结合使用,如果是这样,我该怎么做?我正在使用 mongodb。在我的代码中,我只是为缓存设置了这个设置,cache = Cache(app, config={'CACHE_TYPE': 'simple'})因为这就是 Flask-Cache 文档的Flask-Cache Docs 中提到的
我不清楚的另一件事是,当我myTask1从 Flask 路线内打电话给我时,我是否需要做任何不同的事情task1
这是我正在使用的代码示例。
from flask import (Flask, render_template, flash, redirect, …Run Code Online (Sandbox Code Playgroud) I have a very simple code:
celery = Celery(broker="amqp://guest:guest@172.17.0.1:5672/")
celery.send_task(
"robot.worker",
kwargs={},
exchange="I_am_useless", # with exchange=Exchange("I_am_useless") I got the same results
)
Run Code Online (Sandbox Code Playgroud)
我真的需要将任务发送到交换“I_am_useless”,但是它没有发送到那里,当我在 AMQP 协议级别调试时,我看到发送了一个发布事件(可以)但具有以下特征
reserved-1': 0,
'exchange-name': '',
'routing-key': 'celery',
'mandatory': False,
'immediate': False
Run Code Online (Sandbox Code Playgroud)
因此似乎该参数完全被忽略,因为即使在交换声明事件期间,也会使用名称“celery”。但根据文档send_task采用与https://docs.celeryproject.org/en/latest/reference/celery.app.task.html#celery.app.task.Task.apply_async相同的参数,我们可以看到有一个应接受字符串的“交换”参数。
但是如果我直接打电话给kombu
rabbit_url = "amqp://guest:guest@172.17.0.1:5672/"
conn = Connection(rabbit_url)
channel = conn.channel()
exchange = Exchange("example-exchange", type="direct")
producer = Producer(exchange=exchange, channel=channel, routing_key="BOB")
queue = Queue(name="example-queue", exchange=exchange, routing_key="BOB")
queue.maybe_bind(conn)
queue.declare()
producer.publish("Hello there!")
Run Code Online (Sandbox Code Playgroud)
我正确地看到了交易所名称,所以我想知道我做错了什么?
我正在开发一个使用 celery 异步运行任务的 django 应用程序。现在,用户可以从网页提交表单以启动 celery 任务。但是无法通过单击 django 模板中的按钮来暂停或停止任务。
到目前为止,这是我的代码
芹菜任务
@shared_task
def get_website(website):
website_list = return_website_list(website)
return website_list
Run Code Online (Sandbox Code Playgroud)
在上面的任务中,我调用了一个return_website_list()函数来抓取所需的网站并返回该网站的链接列表。
output.html 模板
<div class="container">
<button class="pause_btn" type="button">Pause task</button>
<button class="resume_btn" type="button">Resume task</button>
<button class="stop_btn" type="button">Stop task</button>
</div>
Run Code Online (Sandbox Code Playgroud)
我希望能够在单击暂停按钮时无限期暂停任务并在单击恢复按钮时恢复任务或在单击停止按钮时完全停止任务的能力。
视图.py
def index(request):
if request.method == 'POST':
website = request.POST.get('website-name')
get_website.delay(website)
return redirect('output')
return render(request, 'index.html')
Run Code Online (Sandbox Code Playgroud)
我在网上搜索,像这些link1,link2,link3。但是这些链接并没有帮助我实现我想要做的事情。
提前致谢
celery ×10
celery-task ×10
python ×7
flask ×2
asynchronous ×1
django ×1
exception ×1
flask-cache ×1
kombu ×1
promise ×1