标签: celery-task

测试芹菜任务是否仍在处理中

如何测试是否仍在芹菜中处理任务(task_id)?我有以下场景:

  1. 在Django视图中启动任务
  2. 将BaseAsyncResult存储在会话中
  3. 关闭芹菜守护程序(硬)以便不再处理任务
  4. 检查任务是否"死"

有任何想法吗?可以查找芹菜正在处理的所有任务并检查我的是否仍然存在?

python django celery celery-task

11
推荐指数
1
解决办法
4086
查看次数

Django Celery实现 - OSError:[Errno 38]函数未实现

我安装了django-celery,我试图启动工作服务器,但是我得到一个没有实现函数的OSError.我在VPS上运行CentOS 5.4版(最终版):

    . broker -> amqp://guest@localhost:5672/
    . queues ->
        . celery -> exchange:celery (direct) binding:celery
    . concurrency -> 4
    . loader -> djcelery.loaders.DjangoLoader
    . logfile -> [stderr]@WARNING
    . events -> OFF
    . beat -> OFF
[2010-07-22 17:10:01,364: WARNING/MainProcess] Traceback (most recent call last):
[2010-07-22 17:10:01,364: WARNING/MainProcess] File "manage.py", line 11, in <module>
[2010-07-22 17:10:01,364: WARNING/MainProcess] execute_manager(settings)
[2010-07-22 17:10:01,364: WARNING/MainProcess] File "/usr/local/lib/python2.6/site-packages/django/core/management/__init__.py", line 438, in execute_manager
[2010-07-22 17:10:01,364: WARNING/MainProcess] utility.execute()
[2010-07-22 17:10:01,364: WARNING/MainProcess] File "/usr/local/lib/python2.6/site-packages/django/core/management/__init__.py", line 379, in execute
[2010-07-22 17:10:01,365: …
Run Code Online (Sandbox Code Playgroud)

python django celery celery-task

10
推荐指数
1
解决办法
5972
查看次数

从芹菜任务中获取芹菜工人的名字?

我希望celery任务能够获取执行它的工作人员的名字,以便进行日志记录.我需要在任务中处理这个问题,而不是直接查询代理.有没有办法做到这一点?我正在使用芹菜和RabbitMQ,如果这很重要的话.

python celery celery-task

10
推荐指数
3
解决办法
5618
查看次数

如何将celery作业的参数输入到要查询的数据库中; 使用mysql

我使用MySql作为芹菜的结果后端.我也想在DB中存储程序参数.

例如:

add.apply_async(args=[num1, num2, user]
Run Code Online (Sandbox Code Playgroud)

在这种情况下,我想在DB中存储用户参数,以便我可以稍后查询.

目前,我返回提供的参数,该参数存储在DB中.

def add(num1, num2, user):
    return [num1+num2, user]
Run Code Online (Sandbox Code Playgroud)

但是,当任务处于运行状态时,未插入用户,我无法在DB中查询它.这有什么解决方案/黑客吗?

python mysql celery celery-task

10
推荐指数
1
解决办法
428
查看次数

Flask SERVER_NAME设置了最佳实践

Flask有这个设置SERVER_NAME,大部分时间都很方便.由于我的应用程序有后台任务,我使用Flask上下文.要使上下文工作,SERVER_NAME应该设置.

SERVER_NAME设置传入的请求进行检查,以匹配该值或路径没有找到.当放置一个nginx(或其他网络服务器在前面时,SERVER_NAME还应该包括端口和反向代理应该处理重写的东西,隐藏端口号从外部世界(它做).对于会话cookie在现代浏览器中工作代理通网址的名称应该是一样的SERVER_NAME,否则浏览器拒绝发送的cookie.这可以通过增加官方来解决hostname/etc/hosts,它指向127.0.0.1.

有一件事我还没弄清楚,它是后台任务中的网址.url_for与_external选项一起使用,以在其发出的邮件中生成URL.但是该URL包含端口,这当然不同于443我运行的nginx端口...

从中移除端口SERVER_NAME会使第一段中描述的内容失败.

那么我处理url_for邮件的最佳选择是什么?创建单独的配置设置?创建我自己的url_for

email flask celery-task nginx-reverse-proxy

10
推荐指数
1
解决办法
322
查看次数

KeyError 在注册任务时在 celery 上收到类型为“”的未注册任务

我对芹菜配置​​有点陌生。

我有一个名为myapp.tasks.my_task例如的任务。

myapp.tasks.my_task当我使用 时,我可以在 celery 的注册任务中看到celery inspect registered。不是说任务注册成功了吗?为什么它会引发以下错误:

KeyError celery.worker.consumer.consumer in on_task_received

Received unregistered task of type 'my_app.tasks.my_task'.
The message has been ignored and discarded.

Did you remember to import the module containing this task?
Or maybe you're using relative imports?

Please see
http://docs.celeryq.org/en/latest/internals/protocol.html
for more information.

The full contents of the message body was:
'[[], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (77b)

Run Code Online (Sandbox Code Playgroud)

还有其他任务my_app.tasks,它们工作正常,但只有这个任务不起作用并得到KeyError

@shared_task(queue='celery')
def other_task():
   """ …
Run Code Online (Sandbox Code Playgroud)

python django celery celery-task python-3.x

10
推荐指数
2
解决办法
8259
查看次数

你在哪里设置芹菜任务的task_id?

我无法找到使用自己的task_id设置task_id的任何示例

沿着这些方向......

def testview1(request):
    for i in xrange(0,1000):
        result = add.delay( i, 4,task_id = i)
        print result.info
        #value = result.wait()
    return HttpResponse("Done") 


@task()
def add(task_id, x, y):
    print add.task_id
    print str(x+y)
    return x + y
Run Code Online (Sandbox Code Playgroud)

python celery-task django-celery

9
推荐指数
1
解决办法
6368
查看次数

从Celery的taskset_id中检索GroupResult?

我正在使用官方文档中描述的芹菜组开始一组芹菜任务

我还将组(taskset)id存储到db中,以便轮询celery以获取任务集状态.

job = group([
        single_test.s(1, 1),
        single_test.s(1, 2),
        single_test.s(1, 3),
    ])

result = job.apply_async()

test_set = MyTestSet()
test_set.taskset_id = result.id

# store test_set into DB
Run Code Online (Sandbox Code Playgroud)

有没有办法result从任务集ID开始获取GroupResult对象(即我的)?像这个问题所做的事情,但与芹菜组合作.

我已经尝试过:

r = GroupResult(taskset_id)
Run Code Online (Sandbox Code Playgroud)

但它不起作用,因为r.results()它总是空的.

我应该使用GroupResult.save()GroupResult.restore() 方法吗?

python celery celery-task

9
推荐指数
1
解决办法
3194
查看次数

优先考虑芹菜中多个队列中的队列?

我们使用celery进行异步后台任务,我们有2个队列用于不同的优先级任务.我们有2个节点集群分别为它们服务.事情正如预期的那样运作良好.

题:

我们主要得到低优先级的任务.为了优化资源利用率,我想知道有没有办法配置工作者(监听高优先级队列)来监听两个队列.但只要有一些工作,就从优先级较高的队列中获取工作?否则,回退到低优先级队列.

我已经完成了@ Celery Task Priority讨论的基于优先级的任务调度.

但我的问题是优先排队,而不仅仅是队列中的任务.

celery celery-task

9
推荐指数
2
解决办法
846
查看次数

Celery + SQS 两次接收相同的任务,同时具有相同的任务 ID

在flask应用程序中使用带有SQS的芹菜,
芹菜同时接收相同的任务两次,任务ID相同

像这样运行工人,
celery worker -A app.jobs.run -l info --pidfile=/var/run/celery/celery.pid --logfile=/var/log/celery/celery.log --time-limit=7200 --concurrency=8

这是芹菜的日志

[2019-11-29 08:07:35,464: INFO/MainProcess] Received task: app.jobs.booking.bookFlightTask[657985d5-c3a3-438d-a524-dbb129529443]  
[2019-11-29 08:07:35,465: INFO/MainProcess] Received task: app.jobs.booking.bookFlightTask[657985d5-c3a3-438d-a524-dbb129529443]  
[2019-11-29 08:07:35,471: WARNING/ForkPoolWorker-4] in booking funtion1
[2019-11-29 08:07:35,473: WARNING/ForkPoolWorker-3] in booking funtion1
[2019-11-29 08:07:35,537: WARNING/ForkPoolWorker-3] book_request_pp
[2019-11-29 08:07:35,543: WARNING/ForkPoolWorker-4] book_request_pp
Run Code Online (Sandbox Code Playgroud)

收到两次相同的任务,并且两者同时运行,

在 pyhton 烧瓶中使用 celery==4.4.0rc4、boto3==1.9.232、kombu==4.6.6 和 SQS。
在 SQS 中,默认可见性超时为 30 分钟,我的任务没有 ETA 也没有确认

我的任务.py

from app import app as flask_app
from app.jobs.run import capp
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy(flask_app) …
Run Code Online (Sandbox Code Playgroud)

amazon-sqs celery flask celery-task

9
推荐指数
1
解决办法
312
查看次数