学习 Celery,阅读Celery 最佳实践,并有一个关于 Celery 数据库使用的非常简单的问题。
德尼·贝尔托维奇 说:
您不应将数据库对象(例如您的用户模型)传递给后台任务,因为序列化对象可能包含过时的数据。
那么,如果我想连接到工作人员中的数据库,正确的选择是什么:
@app.task
def add(x, y, collection):
client = MongoClient('mongodb://localhost:27017/')
db = client.wakawaka
db[collection].insert_one({'sum':x+y})
return True
Run Code Online (Sandbox Code Playgroud)
或者:
client = MongoClient('mongodb://localhost:27017/')
db = client.wakawaka
@app.task
def add(x, y, collection):
db[collection].insert_one({'sum':x+y})
return True
Run Code Online (Sandbox Code Playgroud)
?
UPD:我可以close()在每个任务结束时连接 mongodb,因此每次我需要某些东西时,任务都会连接到新的数据库,并且不会浪费资源。不过,我需要多次打开/关闭数据库连接吗?或者我可以连接一次并以某种方式刷新连接以检索新版本的数据库?
我正在用 Python/Django 开发 Web 应用程序,并且有几个任务在 celery 中运行。
我必须一次运行一个任务 A,因此我使用 --concurrency=1 创建了工作线程,并使用以下命令将任务 A 路由到该工作线程。
celery -A proj worker -Q A -c 1 -l INFO
Run Code Online (Sandbox Code Playgroud)
一切工作正常,因为该工作进程处理任务 A 和其他任务被路由到默认队列。
但是,当我使用inspect命令为工作人员获取注册任务时,上述工作人员返回所有任务。这是绝对正确的,因为当我启动工作程序时,它将项目的所有任务显示为注册任务,但仅处理任务 A。
以下是我启动时工作人员的输出。
$ celery -A proj worker -Q A -c 1 -l INFO
-------------- celery@pet_sms v4.0.2 (latentcall)
---- **** -----
--- * *** * -- Linux-4.8.10-040810-generic-x86_64-with-Ubuntu-16.04-xenial 2018-04-26 14:11:49
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: proj:0x7f298a10d208
- ** ---------- .> transport: redis://localhost:6379/0
- ** …Run Code Online (Sandbox Code Playgroud) 我有一个Celery相当长的任务。超过几分钟。
有时,由于各种原因,一个工作人员被标记为终止,而另一个工作人员则开始工作。如果需要更换运行它的计算机,或者正在部署新的代码版本,则可能会发生这种情况。在这种情况下,工作线程会收到 SIGTERM 信号。
我想知道任务本身是否可以定期检查该工作线程是否已收到 SIGTERM 并且正在等待终止,在这种情况下,只需将任务放回队列中并终止即可。(然后该任务将在另一个工作人员上启动,并继续执行其工作)
编辑:澄清 - 是否可以在任务中检查它是否在等待终止的工作线程上执行。像这样:
# Some long task that can take even a few hours.
def some_task(...):
for i in range(...):
do_some_work()
# That's the missing function:
if did_this_worker_received_SIGTERM_and_waiting_to_be_terminated():
# stop the task in the middle, and it will be executed again later
Run Code Online (Sandbox Code Playgroud) 我已经启动了很多任务,但其中一些还没有完成(763个任务),处于PENDING状态,但是系统没有处理任何东西......有可能重新执行这个任务给芹菜task_id吗?
我正在尝试运行内存中的任务.在工人上注册的任务
[2012-09-13 11:10:18,928: WARNING/PoolWorker-1] [u'B.run', u'M1.run', u'M11.run', u'M22.run', u'M23.run', u'M24.run', u'M25.run', u'M26.run', u'M4.run', u'celery.backend_cleanup', u'celery.chain', u'celery.chord', u'celery.chord_unlock', u'celery.chunks', u'celery.group', u'celery.map', u'celery.starmap', u'impmod.run', u'initializerNew.run']
Run Code Online (Sandbox Code Playgroud)
但它仍然会出错:
[2012-09-13 11:19:59,848: ERROR/MainProcess] Received unregistered task of type 'M24.run'.
The message has been ignored and discarded.
Did you remember to import the module containing this task?
Or maybe you are using relative imports?
Please see http://bit.ly/gLye1c for more information.
The full contents of the message body was:
{'retries': 0, 'task': 'M24.run', 'eta': None, 'args': [{'cnt': '3', …Run Code Online (Sandbox Code Playgroud) 使用模拟返回值修补Celery任务调用将返回<Mock name='mock().get()' ...>而不是预期return_value定义的mock_task.get.return_value = "value".但是,模拟任务在我的单元测试中正常运行.
这是我修补Celery任务的单元测试:
def test_foo(self):
mock_task = Mock()
mock_task.get = Mock(return_value={'success': True})
print mock_task.get() # outputs {'success': True}
with patch('app.tasks.my_task.delay', new=mock_task) as mocked_task:
foo() # this calls the mocked task with an argument, 'input from foo'
mock_tasked.assert_called_with('input from foo') # works
Run Code Online (Sandbox Code Playgroud)
以下是正在测试的功能:
def foo():
print tasks.my_task.delay # shows a Mock object, as expected
# now let's call get() on the mocked task:
task_result = tasks.my_task.delay('input from foo').get()
print task_result # => …Run Code Online (Sandbox Code Playgroud) 比方说,我的代码中有一个预定义的芹菜任务 my_proj.tasks.my_celery_task
我想通过命令行/ HTTP请求(而不是通过我的应用程序)激活任务.
我搜索了文件(看到花和卷曲选项),但没有一个真正好的例子,在那里调用预定义的任务.怎么做到这一点?
在我的Celery应用程序中,我在控制台上获得了两种日志类型,即celery应用程序日志和任务级别日志(在任务内部,我使用logger.INFO(str)语法进行记录)
我想将它们都发送到自定义处理程序(在我的情况下为python-logstash处理程序)
对于django日志,我通过在settings.py中设置处理程序和记录器而成功完成了,但是我对芹菜无奈
我正在编写一个应用程序,它将异步执行一组多个同步任务链.
换句话说,我可能有foo(a,b,c) -> boo(a,b,c)一些列表的管道bs.
我的理解是foo(a,b,c) | boo(a,b,c)为此列表中的每个b 创建一个链.然后这些链形成一个芹菜组,可以异步应用.
我的代码如下:
my_app.py
#!/usr/bin/env python3
import functools
import time
from celery import chain, group, Celery
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
app = Celery("my_app", broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
@app.task
def foo(a, b, c):
logger.info("foo from {0}!".format(b))
return b
@app.task
def boo(a, b, c):
logger.info("boo from {0}!".format(b))
return b
def break_up_tasks(tasks):
try:
first_task, *remaining_tasks = tasks
except ValueError as e:
first_task, remaining_tasks = [], []
return first_task, remaining_tasks
def …Run Code Online (Sandbox Code Playgroud) 我遇到一个奇怪的问题,我的一些任务在发送给经纪人后会被删除.这种情况发生在大约十分之一的任务中.我已经检查过没有老芹菜工人在完成任务.
我使用数据库支持的后端和花来监视丢失的任务,但在apply_async之后返回的task_id不存在于数据库或花中.它的状态总是待定.
然后我用celery信号找出发生了什么.我发现对于丢失的任务,只触发before_task_publish和after_task_publish信号.发布此任务没有任何痕迹.
这些是我的信号
@before_task_publish.connect
def before_task_publish_handler(sender=None, headers=None, body=None, **kwargs):
# information about task are located in headers for task messages
# using the task protocol version 2.
logger.info("BEFORE TASK SENT id:"+body['id'])
@after_task_publish.connect
def after_task_publish_handler(sender=None, headers=None, body=None, exchange=None, routing_key=None, **kwargs):
# information about task are located in headers for task messages
# using the task protocol version 2.
logger.info("AFTER TASK SENT id:"+body['id'])
@task_prerun.connect
def task_prerun_handler(sender=None, task_id=None, task=None, **kwargs):
logger.info("TASK PRERUN with TASK_ID:"+str(task_id))
Run Code Online (Sandbox Code Playgroud)
这是我在日志中发现的
$ cat gunicorn-access.log | grep …Run Code Online (Sandbox Code Playgroud) celery-task ×10
celery ×9
python ×7
asynchronous ×1
celerybeat ×1
distributed ×1
flower ×1
logging ×1
logstash ×1
mocking ×1
mongodb ×1
pymongo ×1
python-3.x ×1
python-mock ×1
rabbitmq ×1