标签: celery-task

Celery 将并行任务链接成和弦

我正在尝试理解这个异步任务处理设置。到目前为止,我一直在考虑使用 Celery,但还没有锁定任何东西。唯一的要求是我可以使用 Redis 作为代理并将任务分发到多个节点上。

       ->  Task2  ->  Task3
Task1  ->  Task2  ->  Task3    [then]    Task4
       ->  Task2  ->  Task3
Run Code Online (Sandbox Code Playgroud)

解释

  • Task1 生成一个项目列表
  • Task2 从 Task1 接收一项作为参数
  • Task2 和 Task3 是链式的,并且每个链都是并行执行的
  • 当所有 Task2-Task3 链完成时执行 Task4(不需要从 Task3 传递任何数据)

那么问题是,我该如何用芹菜做到这一点?

python redis celery celery-task python-3.x

4
推荐指数
1
解决办法
1611
查看次数

send_task() 和 apply_async() 之间的区别

我不确定Python 中 Celery 库之间的区别,特别是在使用任务池时apply_async()send_task()我主要寻找的是拥有一个集中式服务器,其中定义了所有任务,并使用一些 Redis 队列,其他物理实例能够使用池将事件发布/发送到针对不同任务的不同队列,而无需定义每个发布者脚本中能够使用的所有任务列表。

我查看了文档,但实际上没有找到太多可以回答我的问题的内容。

提前致谢!

python celery celery-task python-3.x

4
推荐指数
1
解决办法
3225
查看次数

如何在 celery 任务中使用 asyncio 和 aioredis 锁?

目标:

  1. 可以运行 asyncio 协程。
  2. 纠正异常和任务重试时的 celery 行为。
  3. 可以使用 aioredis 锁。

那么,如何正确运行异步任务来达到目标​​呢?

这是什么RuntimeError: await wasn't used with future(如下),我该如何修复它?


我已经尝试过:

1. 阿斯吉里夫

async_to_sync(来自 asgiref https://pypi.org/project/asgiref/)。

此选项可以运行 asyncio 协程,但重试功能不起作用。

2. 芹菜池异步

https://pypi.org/project/celery-pool-asyncio/

与 asgiref 中的问题相同。(此选项可以运行 asyncio 协程,但重试功能不起作用。)

3.编写自己的异步同步装饰器

我已经尝试创建自己的装饰器,例如运行协程线程安全(asyncio.run_coroutine_threadsafe)的 async_to_sync ,但我的行为如上所述。

4. 异步模块

我还尝试asyncio.run()asyncio.get_event_loop().run_until_complete()(和self.retry(...))内部芹菜任务。这运行良好,任务运行,重试有效,但是协程执行不正确 - 在async函数内部我无法使用 aioredis。

实施注意事项

  • 启动芹菜命令celery -A celery_test.celery_app worker -l info -n worker1 -P gevent …

celery celery-task python-3.x python-asyncio aioredis

4
推荐指数
1
解决办法
5219
查看次数

将 Python 版本 3.6 更改为 3.10 后,我无法从“集合”导入名称“可调用”

 File "C:\Users\Codertjay\PycharmProjects\Teems_App_Kid\teems_app_kid\__init__.py", line 5, in <module>
    from .celery import app as celery_app   File "C:\Users\Codertjay\PycharmProjects\Teems_App_Kid\teems_app_kid\celery.py", line 3, in <module>
    from celery import Celery   File "C:\Users\Codertjay\PycharmProjects\brownie\Teems_App_Kid\lib\site-packages\celery\five.py", line 306, in __getattr__
    module = __import__(self._object_origins[name], None, None, [name])   File "C:\Users\Codertjay\PycharmProjects\brownie\Teems_App_Kid\lib\site-packages\celery\app\__init__.py", line 14, in <module>
    from celery import _state   File "C:\Users\Codertjay\PycharmProjects\brownie\Teems_App_Kid\lib\site-packages\celery\_state.py", line 20, in <module>
    from celery.utils.threads import LocalStack   File "C:\Users\Codertjay\PycharmProjects\brownie\Teems_App_Kid\lib\site-packages\celery\utils\__init__.py", line 20, in <module>
    from collections import Callable ImportError: cannot import name 'Callable' from 'collections' (C:\Users\Codertjay\AppData\Local\Programs\Python\Python310\lib\collections\__init__.py)
Run Code Online (Sandbox Code Playgroud)

python django celery celery-task python-3.x

4
推荐指数
1
解决办法
8927
查看次数

芹菜工人变量分享问题

我在一个项目中使用Python和芹菜.在项目中,我有两个文件:

celeryconfig.py

BROKER_URL = "amqp://guest:guest@localhost:5672//"
CELERY_RESULT_BACKEND = "amqp"
CELERY_IMPORTS = ("example",)
CELERYD_CONCURRENCY = 2
Run Code Online (Sandbox Code Playgroud)

example.py

from celery.task import task
import hashlib

md5 = hashlib.md5()

@task
def getDigest(text):
    print 'Using md5 - ',md5
    md5.update(text)
    return md5.digest()
Run Code Online (Sandbox Code Playgroud)

celeryconfig.py中,我将CELERYD_CONCURRENCY设置为2,这意味着它会将我的任务队列中的任务分配给2个不同的进程.

从Python控制台,我运行:

from example import getDigest
getDigest.delay('foo');getDigest.delay('bar')
Run Code Online (Sandbox Code Playgroud)

这会创建两个由两个工作人员同时执行的任务.问题是,当两个工作进程都运行它们的任务函数[ getDigest() ]时,它们似乎使用相同的哈希对象(md5).celeryd的输出确认了这一点,如下所示.

[PoolWorker-2] Using md5 -
[PoolWorker-2] <md5 HASH object @ 0x23e6870>
[PoolWorker-1] Using md5 -
[PoolWorker-1] <md5 HASH object @ 0x23e6870> …
Run Code Online (Sandbox Code Playgroud)

python multiprocessing celery celery-task celeryd

3
推荐指数
1
解决办法
3275
查看次数

让芹菜广播所有工人的回报结果

有没有办法在Celery Broadcast任务中获得每个工作人员的所有结果?我想监控所有工人的一切是否正常.还将感谢任务发送到的工作者列表.

python celery celery-task

3
推荐指数
1
解决办法
854
查看次数

芹菜结果错误"args必须是列表或元组"

我正在运行一个Django网站并且刚刚让Celery运行,但是我遇到了令人困惑的错误.以下是代码的结构.

在tests.py中:

from tasks import *
from celery.result import AsyncResult

project = Project.objects.create()
# initalize various sub-objects of the project

c = function.delay(project.id)
r = AsyncResult(c.id).ready()
f = AsyncResult(c.id).failed()
# wait until the task is done  
while not r and not f:
    r = AsyncResult(c.id).ready()
    f = AsyncResult(c.id).failed()

self.assertEqual() #will fail because task fails
Run Code Online (Sandbox Code Playgroud)

在tasks.py中:

from __future__ import absolute_import
from celery import shared_task

@shared_task
def function(project_id)
    #a bunch of calculations followed by a save of the project
    project = Project.objects.get(project=project_id) …
Run Code Online (Sandbox Code Playgroud)

python django celery celery-task celeryd

3
推荐指数
1
解决办法
4561
查看次数

Celery反复运行长时间运行的已完成任务

我有一个python celery-redis队列处理一次上传和下载值得一次演出和大量演出的数据。

很少有上传内容需要花费几个小时的时间。但是,一旦完成了这样的任务,我就目睹了这种奇怪的芹菜行为,即芹菜调度程序通过将其再次发送给工作人员(我正在运行一个工作人员)来重新运行刚刚完成的任务,并且它在同一时间发生了2次任务!

有人可以帮助我知道为什么会发生这种情况以及如何预防吗?

这些任务肯定干净地完成了,没有错误报告,因为这些任务运行时间非常长。

python celery celery-task

3
推荐指数
1
解决办法
950
查看次数

检查是否在芹菜任务

如何检查芹菜执行的功能?

def notification():
   # in_celery() returns True if called from celery_test(), 
   #                     False if called from not_celery_test()
   if in_celery():
      # Send mail directly without creation of additional celery subtask
      ...
   else:
      # Send mail with creation of celery task
      ...

@celery.task()
def celery_test():
    notification()

def not_celery_test():
    notification()
Run Code Online (Sandbox Code Playgroud)

python celery flask celery-task

3
推荐指数
1
解决办法
1439
查看次数

kombu.exceptions.EncodeError:用户不可JSON序列化

我有celery 4.1.0的django 1.11.5应用程序,我一直都收到了:

kombu.exceptions.EncodeError: <User: testuser> is not JSON serializable
Run Code Online (Sandbox Code Playgroud)

我的settings.py:

CELERY_BROKER_URL = 'amqp://localhost'
CELERY_RESULT_BACKEND = 'amqp://localhost'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Makassar'
CELERY_BEAT_SCHEDULE = {}
Run Code Online (Sandbox Code Playgroud)

task.py

from __future__ import absolute_import, unicode_literals
from celery import task
from django.contrib.auth.models import User


@task(serializer='json')
def task_number_one():
    user = User.objects.create(username="testuser", email="test@test.com", password="pass")
    return user
Run Code Online (Sandbox Code Playgroud)

在视图中我调用任务:

def form_valid(self, form):
    form.instance.user = self.request.user
    task_number_one.delay()
    return super().form_valid(form)
Run Code Online (Sandbox Code Playgroud)

python django json celery celery-task

3
推荐指数
2
解决办法
2602
查看次数