标签: celery-task

如何在 Celery Worker 中正确连接 mongodb?

学习 Celery,阅读Celery 最佳实践,并有一个关于 Celery 数据库使用的非常简单的问题。

德尼·贝尔托维奇 说:

您不应将数据库对象(例如您的用户模型)传递给后台任务,因为序列化对象可能包含过时的数据。

那么,如果我想连接到工作人员中的数据库,正确的选择是什么:

@app.task
def add(x, y, collection):
    client = MongoClient('mongodb://localhost:27017/')
    db = client.wakawaka
    db[collection].insert_one({'sum':x+y})
    return True
Run Code Online (Sandbox Code Playgroud)

或者:

client = MongoClient('mongodb://localhost:27017/')
db = client.wakawaka

@app.task
def add(x, y, collection):
    db[collection].insert_one({'sum':x+y})
    return True
Run Code Online (Sandbox Code Playgroud)

UPD:我可以close()在每个任务结束时连接 mongodb,因此每次我需要某些东西时,任务都会连接到新的数据库,并且不会浪费资源。不过,我需要多次打开/关闭数据库连接吗?或者我可以连接一次并以某种方式刷新连接以检索新版本的数据库?

python mongodb celery pymongo celery-task

6
推荐指数
1
解决办法
4352
查看次数

如何向特定的worker注册Celery任务?

我正在用 Python/Django 开发 Web 应用程序,并且有几个任务在 celery 中运行。

我必须一次运行一个任务 A,因此我使用 --concurrency=1 创建了工作线程,并使用以下命令将任务 A 路由到该工作线程。

celery -A proj worker -Q A -c 1 -l INFO
Run Code Online (Sandbox Code Playgroud)

一切工作正常,因为该工作进程处理任务 A 和其他任务被路由到默认队列。

但是,当我使用inspect命令为工作人员获取注册任务时,上述工作人员返回所有任务。这是绝对正确的,因为当我启动工作程序时,它将项目的所有任务显示为注册任务,但仅处理任务 A。

以下是我启动时工作人员的输出。

$ celery -A proj worker -Q A -c 1 -l INFO

 -------------- celery@pet_sms v4.0.2 (latentcall)
---- **** ----- 
--- * ***  * -- Linux-4.8.10-040810-generic-x86_64-with-Ubuntu-16.04-xenial 2018-04-26 14:11:49
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         proj:0x7f298a10d208
- ** ---------- .> transport:   redis://localhost:6379/0
- ** …
Run Code Online (Sandbox Code Playgroud)

celery celery-task django-celery celerybeat

6
推荐指数
1
解决办法
2738
查看次数

Celery - 检查工作人员是否收到 SIGTERM

我有一个Celery相当长的任务。超过几分钟。

有时,由于各种原因,一个工作人员被标记为终止,而另一个工作人员则开始工作。如果需要更换运行它的计算机,或者正在部署新的代码版本,则可能会发生这种情况。在这种情况下,工作线程会收到 SIGTERM 信号。

我想知道任务本身是否可以定期检查该工作线程是否已收到 SIGTERM 并且正在等待终止,在这种情况下,只需将任务放回队列中并终止即可。(然后该任务将在另一个工作人员上启动,并继续执行其工作)

编辑:澄清 - 是否可以在任务中检查它是否在等待终止的工作线程上执行。像这样:

# Some long task that can take even a few hours.
def some_task(...):
    for i in range(...):
        do_some_work()
        # That's the missing function:
        if did_this_worker_received_SIGTERM_and_waiting_to_be_terminated():
             # stop the task in the middle, and it will be executed again later
Run Code Online (Sandbox Code Playgroud)

python celery celery-task

6
推荐指数
1
解决办法
6757
查看次数

通过task_id在celery中重试任务

我已经启动了很多任务,但其中一些还没有完成(763个任务),处于PENDING状态,但是系统没有处理任何东西......有可能重新执行这个任务给芹菜task_id吗?

python distributed celery celery-task

5
推荐指数
1
解决办法
2523
查看次数

收到未注册的类型任务

我正在尝试运行内存中的任务.在工人上注册的任务

[2012-09-13 11:10:18,928: WARNING/PoolWorker-1] [u'B.run', u'M1.run', u'M11.run', u'M22.run', u'M23.run', u'M24.run', u'M25.run', u'M26.run', u'M4.run', u'celery.backend_cleanup', u'celery.chain', u'celery.chord', u'celery.chord_unlock', u'celery.chunks', u'celery.group', u'celery.map', u'celery.starmap', u'impmod.run', u'initializerNew.run']
Run Code Online (Sandbox Code Playgroud)

但它仍然会出错:

[2012-09-13 11:19:59,848: ERROR/MainProcess] Received unregistered task of type 'M24.run'.
The message has been ignored and discarded.

Did you remember to import the module containing this task?
Or maybe you are using relative imports?
Please see http://bit.ly/gLye1c for more information.

The full contents of the message body was:
{'retries': 0, 'task': 'M24.run', 'eta': None, 'args': [{'cnt': '3', …
Run Code Online (Sandbox Code Playgroud)

celery celery-task

5
推荐指数
1
解决办法
1115
查看次数

使用补丁在Python中模拟Celery任务调用

使用模拟返回值修补Celery任务调用将返回<Mock name='mock().get()' ...>而不是预期return_value定义的mock_task.get.return_value = "value".但是,模拟任务在我的单元测试中正常运行.

这是我修补Celery任务的单元测试:

def test_foo(self):

    mock_task = Mock()
    mock_task.get = Mock(return_value={'success': True})

    print mock_task.get() # outputs {'success': True}

    with patch('app.tasks.my_task.delay', new=mock_task) as mocked_task:
        foo()  # this calls the mocked task with an argument, 'input from foo'
        mock_tasked.assert_called_with('input from foo')  # works
Run Code Online (Sandbox Code Playgroud)

以下是正在测试的功能:

def foo():
    print tasks.my_task.delay  # shows a Mock object, as expected
    # now let's call get() on the mocked task:
    task_result = tasks.my_task.delay('input from foo').get()
    print task_result  # => …
Run Code Online (Sandbox Code Playgroud)

python mocking celery celery-task python-mock

5
推荐指数
1
解决办法
2853
查看次数

Celery - 通过命令行或HTTP请求激活任务

比方说,我的代码中有一个预定义的芹菜任务 my_proj.tasks.my_celery_task

我想通过命令行/ HTTP请求(而不是通过我的应用程序)激活任务.

我搜索了文件(看到花和卷曲选项),但没有一个真正好的例子,在那里调用预定义的任务.怎么做到这一点?

python scheduled-tasks celery celery-task flower

5
推荐指数
2
解决办法
3383
查看次数

如何将芹菜的所有日志发送到自定义处理程序。就我而言python-logstash处理程序

在我的Celery应用程序中,我在控制台上获得了两种日志类型,即celery应用程序日志和任务级别日志(在任务内部,我使用logger.INFO(str)语法进行记录)

我想将它们都发送到自定义处理程序(在我的情况下为python-logstash处理程序)

对于django日志,我通过在settings.py中设置处理程序和记录器而成功完成了,但是我对芹菜无奈

python logging celery-task django-celery logstash

5
推荐指数
1
解决办法
1242
查看次数

使用Celery在部分任务中使用位置参数的链组

我正在编写一个应用程序,它将异步执行一组多个同步任务链.

换句话说,我可能有foo(a,b,c) -> boo(a,b,c)一些列表的管道bs.

我的理解是foo(a,b,c) | boo(a,b,c)为此列表中的每个b 创建一个链.然后这些链形成一个芹菜组,可以异步应用.

我的代码如下:

my_app.py

#!/usr/bin/env python3

import functools
import time

from celery import chain, group, Celery
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

app = Celery("my_app", broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

@app.task
def foo(a, b, c):
    logger.info("foo from {0}!".format(b))
    return b

@app.task
def boo(a, b, c):
    logger.info("boo from {0}!".format(b))
    return b

def break_up_tasks(tasks):
    try:
        first_task, *remaining_tasks = tasks
    except ValueError as e:
        first_task, remaining_tasks = [], []
    return first_task, remaining_tasks

def …
Run Code Online (Sandbox Code Playgroud)

python asynchronous celery celery-task python-3.x

5
推荐指数
1
解决办法
652
查看次数

Celery任务在发送给rabbitmq经纪人后丢失了

我遇到一个奇怪的问题,我的一些任务在发送给经纪人后会被删除.这种情况发生在大约十分之一的任务中.我已经检查过没有老芹菜工人在完成任务.

我使用数据库支持的后端和花来监视丢失的任务,但在apply_async之后返回的task_id不存在于数据库或花中.它的状态总是待定.

然后我用celery信号找出发生了什么.我发现对于丢失的任务,只触发before_task_publish和after_task_publish信号.发布此任务没有任何痕迹.

这些是我的信号

@before_task_publish.connect
def before_task_publish_handler(sender=None, headers=None, body=None, **kwargs):
# information about task are located in headers for task messages
# using the task protocol version 2.
  logger.info("BEFORE TASK SENT  id:"+body['id'])



@after_task_publish.connect
def after_task_publish_handler(sender=None, headers=None, body=None, exchange=None, routing_key=None, **kwargs):
# information about task are located in headers for task messages
# using the task protocol version 2.
    logger.info("AFTER TASK SENT  id:"+body['id'])


@task_prerun.connect
def task_prerun_handler(sender=None, task_id=None, task=None, **kwargs):
  logger.info("TASK PRERUN with TASK_ID:"+str(task_id))
Run Code Online (Sandbox Code Playgroud)

这是我在日志中发现的

$ cat gunicorn-access.log | grep …
Run Code Online (Sandbox Code Playgroud)

rabbitmq celery celery-task

5
推荐指数
1
解决办法
710
查看次数