我想在链中使用一个组(或块),例如:
chain(getRange.s(3), GROUP() , xsum.s() )
Run Code Online (Sandbox Code Playgroud)
GROUP()一组double()任务在哪里,即group(double(0),double(1),double(2)).在如何链接一个将列表返回到组中的Celery任务中发布了类似的问题?但是没有解释如何将组中的输出传递给链中的下一个任务.
@task
def getRange(x):
return range(x)
@task
def double(nr):
return nr*2
@task
def xsum(list):
return sum(list)
Run Code Online (Sandbox Code Playgroud) 通过查看创建芹菜任务的不同方法,我感到非常困惑。从表面上看,它们都是一样的,所以,有人可以解释一下两者之间的区别是什么。
1。
from myproject.tasks import app
@app.task
def foo():
pass
Run Code Online (Sandbox Code Playgroud)
2。
from celery import task
@task
def foo():
pass
Run Code Online (Sandbox Code Playgroud)
3。
from celery import shared_task
@shared_task
def foo():
pass
Run Code Online (Sandbox Code Playgroud)
通过一点点谷歌搜索,我知道shared_task当没有具体的应用程序实例时,将使用第一和第三之间的区别。有人可以详细说明一下,什么时候使用第二个?
我正在尝试测试一些包括调用 celery 任务的函数。任务涉及调用 3rd 方网站,我需要在测试期间避免它。
知道如何在测试期间禁用所有 celery 任务吗?
我正在本地开发一个 Django 应用程序,它需要将 CSV 文件作为输入并对该文件运行一些分析。我在本地运行 Celery、RabbitMQ 和 Web 服务器。当我导入文件时,我在 Celery 服务器上看到以下错误:
[2015-12-11 16:58:53,906: WARNING/MainProcess] celery@Joes-MBP ready.
[2015-12-11 16:59:11,068: ERROR/MainProcess] Task program_manager.tasks.analyze_list_import_program[db22de16-b92f-4220-b2bd-5accf484c99a] raised unexpected: WorkerLostError('Worker exited prematurely: signal 11 (SIGSEGV).',)
Traceback (most recent call last):
File "/Users/joefusaro/rl_proto2/venv/lib/python2.7/site-packages/billiard/pool.py", line 1175, in mark_as_worker_lost
human_status(exitcode)),
WorkerLostError: Worker exited prematurely: signal 11 (SIGSEGV).
Run Code Online (Sandbox Code Playgroud)
我不知道如何进一步解决这个问题;如果有帮助,我已经从 program_manager/tasks.py 复制了相关代码:
from __future__ import absolute_import
import csv
import rollbar
from celery import shared_task
from celery.utils.log import get_task_logger
from qscore.models import QualityScore
from integrations.salesforce.prepare import read_csv
from …Run Code Online (Sandbox Code Playgroud) 更新:为了简单起见,我决定尝试使用 Django 作为代理,因为我假设我在 Redis 设置中做错了什么。但是,在进行文档中描述的更改后,当尝试使用.delay(). Celery 工作线程启动并显示它已连接到 Django 以进行传输。这可能是防火墙问题吗?
原来的
\n我正在开发一个 Django 项目并尝试添加后台任务。我已经安装了 Celery 并选择了 Redis 作为代理,并且也安装了它(我在 Windows 机器上,仅供参考)。celery Worker 启动,连接到 Redis 服务器,并发现我的shared_tasks
-------------- celery@GALACTICA v3.1.19 (Cipater)\n---- **** -----\n--- * *** * -- Windows-7-6.1.7601-SP1\n-- * - **** ---\n- ** ---------- [config]\n- ** ---------- .> app: proj:0x2dbf970\n- ** ---------- .> transport: redis://localhost:6379/0\n- ** ---------- .> results: disabled\n- *** --- * --- .> concurrency: 8 (prefork)\n-- ******* ----\n--- ***** ----- [queues]\n -------------- .> …Run Code Online (Sandbox Code Playgroud) 几天后,我的芹菜服务将无限期地重复一项任务。这有点难以重现,但每周定期发生一次或更频繁,具体取决于正在处理的任务量。
我将感谢有关如何获取有关此问题的更多数据的任何提示,因为我不知道如何跟踪它。出现时,重启celery会暂时解决。
我有一个 celery 节点运行 4 个工人(版本 3.1.23)。代理和结果后端在 Redis 上。我只发布到一个队列,我不使用 celery beat。
Django 的 setting.py 中的配置是:
BROKER_URL = 'redis://localhost:6380'
CELERY_RESULT_BACKEND = 'redis://localhost:6380'
Run Code Online (Sandbox Code Playgroud)
日志的相关部分:
[2016-05-28 10:37:21,957: INFO/MainProcess] Received task: painel.tasks.indicar_cliente[defc87bc-5dd5-4857-9e45-d2a43aeb2647]
[2016-05-28 11:37:58,005: INFO/MainProcess] Received task: painel.tasks.indicar_cliente[defc87bc-5dd5-4857-9e45-d2a43aeb2647]
[2016-05-28 13:37:59,147: INFO/MainProcess] Received task: painel.tasks.indicar_cliente[defc87bc-5dd5-4857-9e45-d2a43aeb2647]
...
[2016-05-30 09:27:47,136: INFO/MainProcess] Task painel.tasks.indicar_cliente[defc87bc-5dd5-4857-9e45-d2a43aeb2647] succeeded in 53.33468166703824s: None
[2016-05-30 09:43:08,317: INFO/MainProcess] Task painel.tasks.indicar_cliente[defc87bc-5dd5-4857-9e45-d2a43aeb2647] succeeded in 466.0324719119817s: None
[2016-05-30 09:57:25,550: INFO/MainProcess] Task painel.tasks.indicar_cliente[defc87bc-5dd5-4857-9e45-d2a43aeb2647] succeeded in 642.7634702899959s: None
Run Code Online (Sandbox Code Playgroud)
任务由用户请求发送:
tasks.indicar_cliente.delay(indicacao_db.id)
Run Code Online (Sandbox Code Playgroud)
这是任务的源代码和celery 服务配置。
为什么在服务运行一段时间后会多次收到任务?我怎样才能获得一致的行为?
我在同一个VM上部署了一个django(1.10)+ celery(4.x),rabbitmq作为代理(在同一台机器上).我想在多节点架构上开发相同的应用程序,就像我可以复制许多工作节点,并将任务扩展为快速运行.这里,
学习 Celery,阅读Celery 最佳实践,并有一个关于 Celery 数据库使用的非常简单的问题。
德尼·贝尔托维奇 说:
您不应将数据库对象(例如您的用户模型)传递给后台任务,因为序列化对象可能包含过时的数据。
那么,如果我想连接到工作人员中的数据库,正确的选择是什么:
@app.task
def add(x, y, collection):
client = MongoClient('mongodb://localhost:27017/')
db = client.wakawaka
db[collection].insert_one({'sum':x+y})
return True
Run Code Online (Sandbox Code Playgroud)
或者:
client = MongoClient('mongodb://localhost:27017/')
db = client.wakawaka
@app.task
def add(x, y, collection):
db[collection].insert_one({'sum':x+y})
return True
Run Code Online (Sandbox Code Playgroud)
?
UPD:我可以close()在每个任务结束时连接 mongodb,因此每次我需要某些东西时,任务都会连接到新的数据库,并且不会浪费资源。不过,我需要多次打开/关闭数据库连接吗?或者我可以连接一次并以某种方式刷新连接以检索新版本的数据库?
我正在尝试学习如何使用 celery 在我的一个模型上每天检查日期。我的一个模型包含一个到期日期和一个布尔字段,表明他们的保险是否已过期。
模型很大,所以我要发布一个精简版。我想我有两个选择。要么在模型方法上运行 celery 任务,要么在我的 tasks.py 中重写该函数。然后我需要使用 Celery beat 来运行日程表来每天检查。
我有这个功能可以工作,但我直接传递了我认为错误的模型对象。
我也遇到了如何在 celery.py 中的 celery beat 调度程序中使用 args 的问题。
我真的很接近让这个工作,但我想我会以错误的方式执行任务。我认为在模型方法上执行任务可能是最干净的,我只是不确定如何完成它。
模型.py
class CarrierCompany(models.Model):
name = models.CharField(max_length=255, unique=True)
insurance_expiration = models.DateTimeField(null=True)
insurance_active = models.BooleanField()
def insurance_expiration_check(self):
if self.insurance_expiration > datetime.today().date():
self.insurance_active = True
self.save()
print("Insurance Active")
else:
self.insurance_active = False
self.save()
print("Insurance Inactive")
Run Code Online (Sandbox Code Playgroud)
任务.py
from __future__ import absolute_import, unicode_literals
from celery.decorators import task
from datetime import datetime, date
from django.utils import timezone
from .models import CarrierCompany
@task(name="insurance_expired")
def insurance_date():
carriers …Run Code Online (Sandbox Code Playgroud) 我正在用 Python/Django 开发 Web 应用程序,并且有几个任务在 celery 中运行。
我必须一次运行一个任务 A,因此我使用 --concurrency=1 创建了工作线程,并使用以下命令将任务 A 路由到该工作线程。
celery -A proj worker -Q A -c 1 -l INFO
Run Code Online (Sandbox Code Playgroud)
一切工作正常,因为该工作进程处理任务 A 和其他任务被路由到默认队列。
但是,当我使用inspect命令为工作人员获取注册任务时,上述工作人员返回所有任务。这是绝对正确的,因为当我启动工作程序时,它将项目的所有任务显示为注册任务,但仅处理任务 A。
以下是我启动时工作人员的输出。
$ celery -A proj worker -Q A -c 1 -l INFO
-------------- celery@pet_sms v4.0.2 (latentcall)
---- **** -----
--- * *** * -- Linux-4.8.10-040810-generic-x86_64-with-Ubuntu-16.04-xenial 2018-04-26 14:11:49
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: proj:0x7f298a10d208
- ** ---------- .> transport: redis://localhost:6379/0
- ** …Run Code Online (Sandbox Code Playgroud) celery-task ×10
celery ×9
python ×7
django ×5
celerybeat ×2
architecture ×1
djcelery ×1
mongodb ×1
pymongo ×1
redis ×1