如何测试是否仍在芹菜中处理任务(task_id)?我有以下场景:
有任何想法吗?可以查找芹菜正在处理的所有任务并检查我的是否仍然存在?
我安装了django-celery,我试图启动工作服务器,但是我得到一个没有实现函数的OSError.我在VPS上运行CentOS 5.4版(最终版):
. broker -> amqp://guest@localhost:5672/
. queues ->
. celery -> exchange:celery (direct) binding:celery
. concurrency -> 4
. loader -> djcelery.loaders.DjangoLoader
. logfile -> [stderr]@WARNING
. events -> OFF
. beat -> OFF
[2010-07-22 17:10:01,364: WARNING/MainProcess] Traceback (most recent call last):
[2010-07-22 17:10:01,364: WARNING/MainProcess] File "manage.py", line 11, in <module>
[2010-07-22 17:10:01,364: WARNING/MainProcess] execute_manager(settings)
[2010-07-22 17:10:01,364: WARNING/MainProcess] File "/usr/local/lib/python2.6/site-packages/django/core/management/__init__.py", line 438, in execute_manager
[2010-07-22 17:10:01,364: WARNING/MainProcess] utility.execute()
[2010-07-22 17:10:01,364: WARNING/MainProcess] File "/usr/local/lib/python2.6/site-packages/django/core/management/__init__.py", line 379, in execute
[2010-07-22 17:10:01,365: …Run Code Online (Sandbox Code Playgroud) 我希望celery任务能够获取执行它的工作人员的名字,以便进行日志记录.我需要在任务中处理这个问题,而不是直接查询代理.有没有办法做到这一点?我正在使用芹菜和RabbitMQ,如果这很重要的话.
我使用MySql作为芹菜的结果后端.我也想在DB中存储程序参数.
例如:
add.apply_async(args=[num1, num2, user]
Run Code Online (Sandbox Code Playgroud)
在这种情况下,我想在DB中存储用户参数,以便我可以稍后查询.
目前,我返回提供的参数,该参数存储在DB中.
def add(num1, num2, user):
return [num1+num2, user]
Run Code Online (Sandbox Code Playgroud)
但是,当任务处于运行状态时,未插入用户,我无法在DB中查询它.这有什么解决方案/黑客吗?
Flask有这个设置SERVER_NAME,大部分时间都很方便.由于我的应用程序有后台任务,我使用Flask上下文.要使上下文工作,SERVER_NAME应该设置.
当SERVER_NAME设置传入的请求进行检查,以匹配该值或路径没有找到.当放置一个nginx(或其他网络服务器在前面时,SERVER_NAME还应该包括端口和反向代理应该处理重写的东西,隐藏端口号从外部世界(它做).对于会话cookie在现代浏览器中工作代理通网址的名称应该是一样的SERVER_NAME,否则浏览器拒绝发送的cookie.这可以通过增加官方来解决hostname的/etc/hosts,它指向127.0.0.1.
有一件事我还没弄清楚,它是后台任务中的网址.url_for与_external选项一起使用,以在其发出的邮件中生成URL.但是该URL包含端口,这当然不同于443我运行的nginx端口...
从中移除端口SERVER_NAME会使第一段中描述的内容失败.
那么我处理url_for邮件的最佳选择是什么?创建单独的配置设置?创建我自己的url_for?
我对芹菜配置有点陌生。
我有一个名为myapp.tasks.my_task例如的任务。
myapp.tasks.my_task当我使用 时,我可以在 celery 的注册任务中看到celery inspect registered。不是说任务注册成功了吗?为什么它会引发以下错误:
KeyError celery.worker.consumer.consumer in on_task_received
Received unregistered task of type 'my_app.tasks.my_task'.
The message has been ignored and discarded.
Did you remember to import the module containing this task?
Or maybe you're using relative imports?
Please see
http://docs.celeryq.org/en/latest/internals/protocol.html
for more information.
The full contents of the message body was:
'[[], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (77b)
Run Code Online (Sandbox Code Playgroud)
还有其他任务my_app.tasks,它们工作正常,但只有这个任务不起作用并得到KeyError:
@shared_task(queue='celery')
def other_task():
""" …Run Code Online (Sandbox Code Playgroud) 我无法找到使用自己的task_id设置task_id的任何示例
沿着这些方向......
def testview1(request):
for i in xrange(0,1000):
result = add.delay( i, 4,task_id = i)
print result.info
#value = result.wait()
return HttpResponse("Done")
@task()
def add(task_id, x, y):
print add.task_id
print str(x+y)
return x + y
Run Code Online (Sandbox Code Playgroud) 我正在使用官方文档中描述的芹菜组开始一组芹菜任务
我还将组(taskset)id存储到db中,以便轮询celery以获取任务集状态.
job = group([
single_test.s(1, 1),
single_test.s(1, 2),
single_test.s(1, 3),
])
result = job.apply_async()
test_set = MyTestSet()
test_set.taskset_id = result.id
# store test_set into DB
Run Code Online (Sandbox Code Playgroud)
有没有办法result从任务集ID开始获取GroupResult对象(即我的)?像这个问题所做的事情,但与芹菜组合作.
我已经尝试过:
r = GroupResult(taskset_id)
Run Code Online (Sandbox Code Playgroud)
但它不起作用,因为r.results()它总是空的.
我应该使用GroupResult.save()和GroupResult.restore() 方法吗?
我们使用celery进行异步后台任务,我们有2个队列用于不同的优先级任务.我们有2个节点集群分别为它们服务.事情正如预期的那样运作良好.
题:
我们主要得到低优先级的任务.为了优化资源利用率,我想知道有没有办法配置工作者(监听高优先级队列)来监听两个队列.但只要有一些工作,就从优先级较高的队列中获取工作?否则,回退到低优先级队列.
我已经完成了@ Celery Task Priority讨论的基于优先级的任务调度.
但我的问题是优先排队,而不仅仅是队列中的任务.
在flask应用程序中使用带有SQS的芹菜,
但芹菜同时接收相同的任务两次,任务ID相同,
像这样运行工人,
celery worker -A app.jobs.run -l info --pidfile=/var/run/celery/celery.pid --logfile=/var/log/celery/celery.log --time-limit=7200 --concurrency=8
这是芹菜的日志
[2019-11-29 08:07:35,464: INFO/MainProcess] Received task: app.jobs.booking.bookFlightTask[657985d5-c3a3-438d-a524-dbb129529443]
[2019-11-29 08:07:35,465: INFO/MainProcess] Received task: app.jobs.booking.bookFlightTask[657985d5-c3a3-438d-a524-dbb129529443]
[2019-11-29 08:07:35,471: WARNING/ForkPoolWorker-4] in booking funtion1
[2019-11-29 08:07:35,473: WARNING/ForkPoolWorker-3] in booking funtion1
[2019-11-29 08:07:35,537: WARNING/ForkPoolWorker-3] book_request_pp
[2019-11-29 08:07:35,543: WARNING/ForkPoolWorker-4] book_request_pp
Run Code Online (Sandbox Code Playgroud)
收到两次相同的任务,并且两者同时运行,
在 pyhton 烧瓶中使用 celery==4.4.0rc4、boto3==1.9.232、kombu==4.6.6 和 SQS。
在 SQS 中,默认可见性超时为 30 分钟,我的任务没有 ETA 也没有确认
我的任务.py
from app import app as flask_app
from app.jobs.run import capp
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy(flask_app) …Run Code Online (Sandbox Code Playgroud) celery-task ×10
celery ×8
python ×7
django ×3
flask ×2
amazon-sqs ×1
email ×1
mysql ×1
python-3.x ×1