标签: celery-task

芹菜 - 在链子里面的小组

我想在链中使用一个组(或块),例如:

chain(getRange.s(3),  GROUP() , xsum.s() )
Run Code Online (Sandbox Code Playgroud)

GROUP()一组double()任务在哪里,即group(double(0),double(1),double(2)).在如何链接一个将列表返回到组中的Celery任务中发布了类似的问题但是没有解释如何将组中的输出传递给链中的下一个任务.

@task
def getRange(x):
    return range(x)

@task
def double(nr):
    return nr*2

@task
def xsum(list):
    return sum(list)
Run Code Online (Sandbox Code Playgroud)

python celery celery-task

7
推荐指数
1
解决办法
3772
查看次数

创建芹菜任务的不同方法之间的区别

通过查看创建芹菜任务的不同方法,我感到非常困惑。从表面上看,它们都是一样的,所以,有人可以解释一下两者之间的区别是什么。

1。

from myproject.tasks import app

@app.task
def foo():
    pass
Run Code Online (Sandbox Code Playgroud)

2。

from celery import task

@task
def foo():
    pass
Run Code Online (Sandbox Code Playgroud)

3。

from celery import shared_task

@shared_task
def foo():
    pass
Run Code Online (Sandbox Code Playgroud)

通过一点点谷歌搜索,我知道shared_task当没有具体的应用程序实例时,将使用第一和第三之间的区别。有人可以详细说明一下,什么时候使用第二个?

python django celery celery-task django-celery

7
推荐指数
1
解决办法
530
查看次数

如何在 django 上测试时禁用 celery 任务

我正在尝试测试一些包括调用 celery 任务的函数。任务涉及调用 3rd 方网站,我需要在测试期间避免它。

知道如何在测试期间禁用所有 celery 任务吗?

django celery-task django-celery

6
推荐指数
2
解决办法
1502
查看次数

Celery - “WorkerLostError:Worker 过早退出:信号 11(SIGSEGV)”

我正在本地开发一个 Django 应用程序它需要将 CSV 文件作为输入并对该文件运行一些分析。我在本地运行 Celery、RabbitMQ 和 Web 服务器。当我导入文件时,我在 Celery 服务器上看到以下错误:

[2015-12-11 16:58:53,906: WARNING/MainProcess] celery@Joes-MBP ready.
[2015-12-11 16:59:11,068: ERROR/MainProcess] Task program_manager.tasks.analyze_list_import_program[db22de16-b92f-4220-b2bd-5accf484c99a] raised unexpected: WorkerLostError('Worker exited prematurely: signal 11 (SIGSEGV).',)
Traceback (most recent call last):
File "/Users/joefusaro/rl_proto2/venv/lib/python2.7/site-packages/billiard/pool.py", line 1175, in mark_as_worker_lost
human_status(exitcode)),
WorkerLostError: Worker exited prematurely: signal 11 (SIGSEGV).
Run Code Online (Sandbox Code Playgroud)

我不知道如何进一步解决这个问题;如果有帮助,我已经从 program_manager/tasks.py 复制了相关代码:

from __future__ import absolute_import

import csv
import rollbar
from celery import shared_task
from celery.utils.log import get_task_logger

from qscore.models import QualityScore
from integrations.salesforce.prepare import read_csv
from …
Run Code Online (Sandbox Code Playgroud)

python celery celery-task django-celery djcelery

6
推荐指数
1
解决办法
5649
查看次数

Celery(Django + Redis)任务失败:“无法建立连接,因为目标机器主动拒绝”

更新:为了简单起见,我决定尝试使用 Django 作为代理,因为我假设我在 Redis 设置中做错了什么。但是,在进行文档中描述的更改后,当尝试使用.delay(). Celery 工作线程启动并显示它已连接到 Django 以进行传输。这可能是防火墙问题吗?

\n

原来的

\n

我正在开发一个 Django 项目并尝试添加后台任务。我已经安装了 Celery 并选择了 Redis 作为代理,并且也安装了它(我在 Windows 机器上,仅供参考)。celery Worker 启动,连接到 Redis 服务器,并发现我的shared_tasks

\n
 -------------- celery@GALACTICA v3.1.19 (Cipater)\n---- **** -----\n--- * ***  * -- Windows-7-6.1.7601-SP1\n-- * - **** ---\n- ** ---------- [config]\n- ** ---------- .> app:         proj:0x2dbf970\n- ** ---------- .> transport:   redis://localhost:6379/0\n- ** ---------- .> results:     disabled\n- *** --- * --- .> concurrency: 8 (prefork)\n-- ******* ----\n--- ***** ----- [queues]\n -------------- .> …
Run Code Online (Sandbox Code Playgroud)

python django redis celery celery-task

6
推荐指数
2
解决办法
6062
查看次数

在 Celery 中重复的任务

几天后,我的芹菜服务将无限期地重复一项任务。这有点难以重现,但每周定期发生一次或更频繁,具体取决于正在处理的任务量。

我将感谢有关如何获取有关此问题的更多数据的任何提示,因为我不知道如何跟踪它。出现时,重启celery会暂时解决。

我有一个 celery 节点运行 4 个工人(版本 3.1.23)。代理和结果后端在 Redis 上。我只发布到一个队列,我不使用 celery beat。

Django 的 setting.py 中的配置是:

BROKER_URL = 'redis://localhost:6380'
CELERY_RESULT_BACKEND = 'redis://localhost:6380'
Run Code Online (Sandbox Code Playgroud)

日志的相关部分:

[2016-05-28 10:37:21,957: INFO/MainProcess] Received task: painel.tasks.indicar_cliente[defc87bc-5dd5-4857-9e45-d2a43aeb2647]
[2016-05-28 11:37:58,005: INFO/MainProcess] Received task: painel.tasks.indicar_cliente[defc87bc-5dd5-4857-9e45-d2a43aeb2647]
[2016-05-28 13:37:59,147: INFO/MainProcess] Received task: painel.tasks.indicar_cliente[defc87bc-5dd5-4857-9e45-d2a43aeb2647]
...
[2016-05-30 09:27:47,136: INFO/MainProcess] Task painel.tasks.indicar_cliente[defc87bc-5dd5-4857-9e45-d2a43aeb2647] succeeded in 53.33468166703824s: None
[2016-05-30 09:43:08,317: INFO/MainProcess] Task painel.tasks.indicar_cliente[defc87bc-5dd5-4857-9e45-d2a43aeb2647] succeeded in 466.0324719119817s: None
[2016-05-30 09:57:25,550: INFO/MainProcess] Task painel.tasks.indicar_cliente[defc87bc-5dd5-4857-9e45-d2a43aeb2647] succeeded in 642.7634702899959s: None
Run Code Online (Sandbox Code Playgroud)

任务由用户请求发送:

tasks.indicar_cliente.delay(indicacao_db.id)
Run Code Online (Sandbox Code Playgroud)

这是任务源代码celery 服务配置

为什么在服务运行一段时间后会多次收到任务?我怎样才能获得一致的行为?

python celery celery-task

6
推荐指数
2
解决办法
2291
查看次数

Django + Celery在多个工作节点上执行任务

我在同一个VM上部署了一个django(1.10)+ celery(4.x),rabbitmq作为代理(在同一台机器上).我想在多节点架构上开发相同的应用程序,就像我可以复制许多工作节点,并将任务扩展为快速运行.这里,

  1. 如何使用rabbitmq为这种架构配置芹菜?
  2. 在其他工作节点上,应该设置什么?

python architecture django celery celery-task

6
推荐指数
1
解决办法
1512
查看次数

如何在 Celery Worker 中正确连接 mongodb?

学习 Celery,阅读Celery 最佳实践,并有一个关于 Celery 数据库使用的非常简单的问题。

德尼·贝尔托维奇 说:

您不应将数据库对象(例如您的用户模型)传递给后台任务,因为序列化对象可能包含过时的数据。

那么,如果我想连接到工作人员中的数据库,正确的选择是什么:

@app.task
def add(x, y, collection):
    client = MongoClient('mongodb://localhost:27017/')
    db = client.wakawaka
    db[collection].insert_one({'sum':x+y})
    return True
Run Code Online (Sandbox Code Playgroud)

或者:

client = MongoClient('mongodb://localhost:27017/')
db = client.wakawaka

@app.task
def add(x, y, collection):
    db[collection].insert_one({'sum':x+y})
    return True
Run Code Online (Sandbox Code Playgroud)

UPD:我可以close()在每个任务结束时连接 mongodb,因此每次我需要某些东西时,任务都会连接到新的数据库,并且不会浪费资源。不过,我需要多次打开/关闭数据库连接吗?或者我可以连接一次并以某种方式刷新连接以检索新版本的数据库?

python mongodb celery pymongo celery-task

6
推荐指数
1
解决办法
4352
查看次数

Django 模型上的 Celery 任务

我正在尝试学习如何使用 celery 在我的一个模型上每天检查日期。我的一个模型包含一个到期日期和一个布尔字段,表明他们的保险是否已过期。

模型很大,所以我要发布一个精简版。我想我有两个选择。要么在模型方法上运行 celery 任务,要么在我的 tasks.py 中重写该函数。然后我需要使用 Celery beat 来运行日程表来每天检查。

我有这个功能可以工作,但我直接传递了我认为错误的模型对象。

我也遇到了如何在 celery.py 中的 celery beat 调度程序中使用 args 的问题。

我真的很接近让这个工作,但我想我会以错误的方式执行任务。我认为在模型方法上执行任务可能是最干净的,我只是不确定如何完成它。

模型.py

class CarrierCompany(models.Model):
    name = models.CharField(max_length=255, unique=True)
    insurance_expiration = models.DateTimeField(null=True)
    insurance_active = models.BooleanField()

    def insurance_expiration_check(self):
        if self.insurance_expiration > datetime.today().date():
            self.insurance_active = True
            self.save()
            print("Insurance Active")
        else:
            self.insurance_active = False
            self.save()
            print("Insurance Inactive")
Run Code Online (Sandbox Code Playgroud)

任务.py

from __future__ import absolute_import, unicode_literals
from celery.decorators import task
from datetime import datetime, date
from django.utils import timezone
from .models import CarrierCompany



@task(name="insurance_expired")
def insurance_date():
    carriers …
Run Code Online (Sandbox Code Playgroud)

django celery celery-task celerybeat

6
推荐指数
1
解决办法
4849
查看次数

如何向特定的worker注册Celery任务?

我正在用 Python/Django 开发 Web 应用程序,并且有几个任务在 celery 中运行。

我必须一次运行一个任务 A,因此我使用 --concurrency=1 创建了工作线程,并使用以下命令将任务 A 路由到该工作线程。

celery -A proj worker -Q A -c 1 -l INFO
Run Code Online (Sandbox Code Playgroud)

一切工作正常,因为该工作进程处理任务 A 和其他任务被路由到默认队列。

但是,当我使用inspect命令为工作人员获取注册任务时,上述工作人员返回所有任务。这是绝对正确的,因为当我启动工作程序时,它将项目的所有任务显示为注册任务,但仅处理任务 A。

以下是我启动时工作人员的输出。

$ celery -A proj worker -Q A -c 1 -l INFO

 -------------- celery@pet_sms v4.0.2 (latentcall)
---- **** ----- 
--- * ***  * -- Linux-4.8.10-040810-generic-x86_64-with-Ubuntu-16.04-xenial 2018-04-26 14:11:49
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         proj:0x7f298a10d208
- ** ---------- .> transport:   redis://localhost:6379/0
- ** …
Run Code Online (Sandbox Code Playgroud)

celery celery-task django-celery celerybeat

6
推荐指数
1
解决办法
2738
查看次数