我正在尝试理解这个异步任务处理设置。到目前为止,我一直在考虑使用 Celery,但还没有锁定任何东西。唯一的要求是我可以使用 Redis 作为代理并将任务分发到多个节点上。
-> Task2 -> Task3
Task1 -> Task2 -> Task3 [then] Task4
-> Task2 -> Task3
Run Code Online (Sandbox Code Playgroud)
解释:
那么问题是,我该如何用芹菜做到这一点?
我不确定Python 中 Celery 库之间的区别,特别是在使用任务池时apply_async()。send_task()我主要寻找的是拥有一个集中式服务器,其中定义了所有任务,并使用一些 Redis 队列,其他物理实例能够使用池将事件发布/发送到针对不同任务的不同队列,而无需定义每个发布者脚本中能够使用的所有任务列表。
我查看了文档,但实际上没有找到太多可以回答我的问题的内容。
提前致谢!
那么,如何正确运行异步任务来达到目标呢?
这是什么RuntimeError: await wasn't used with future(如下),我该如何修复它?
我已经尝试过:
async_to_sync(来自 asgiref https://pypi.org/project/asgiref/)。
此选项可以运行 asyncio 协程,但重试功能不起作用。
(https://pypi.org/project/celery-pool-asyncio/)
与 asgiref 中的问题相同。(此选项可以运行 asyncio 协程,但重试功能不起作用。)
我已经尝试创建自己的装饰器,例如运行协程线程安全(asyncio.run_coroutine_threadsafe)的 async_to_sync ,但我的行为如上所述。
我还尝试asyncio.run()或asyncio.get_event_loop().run_until_complete()(和self.retry(...))内部芹菜任务。这运行良好,任务运行,重试有效,但是协程执行不正确 - 在async函数内部我无法使用 aioredis。
实施注意事项:
celery -A celery_test.celery_app worker -l info -n worker1 -P gevent … File "C:\Users\Codertjay\PycharmProjects\Teems_App_Kid\teems_app_kid\__init__.py", line 5, in <module>
from .celery import app as celery_app File "C:\Users\Codertjay\PycharmProjects\Teems_App_Kid\teems_app_kid\celery.py", line 3, in <module>
from celery import Celery File "C:\Users\Codertjay\PycharmProjects\brownie\Teems_App_Kid\lib\site-packages\celery\five.py", line 306, in __getattr__
module = __import__(self._object_origins[name], None, None, [name]) File "C:\Users\Codertjay\PycharmProjects\brownie\Teems_App_Kid\lib\site-packages\celery\app\__init__.py", line 14, in <module>
from celery import _state File "C:\Users\Codertjay\PycharmProjects\brownie\Teems_App_Kid\lib\site-packages\celery\_state.py", line 20, in <module>
from celery.utils.threads import LocalStack File "C:\Users\Codertjay\PycharmProjects\brownie\Teems_App_Kid\lib\site-packages\celery\utils\__init__.py", line 20, in <module>
from collections import Callable ImportError: cannot import name 'Callable' from 'collections' (C:\Users\Codertjay\AppData\Local\Programs\Python\Python310\lib\collections\__init__.py)
Run Code Online (Sandbox Code Playgroud) 我在一个项目中使用Python和芹菜.在项目中,我有两个文件:
celeryconfig.py
BROKER_URL = "amqp://guest:guest@localhost:5672//"
CELERY_RESULT_BACKEND = "amqp"
CELERY_IMPORTS = ("example",)
CELERYD_CONCURRENCY = 2
Run Code Online (Sandbox Code Playgroud)
和example.py
from celery.task import task
import hashlib
md5 = hashlib.md5()
@task
def getDigest(text):
print 'Using md5 - ',md5
md5.update(text)
return md5.digest()
Run Code Online (Sandbox Code Playgroud)
在celeryconfig.py中,我将CELERYD_CONCURRENCY设置为2,这意味着它会将我的任务队列中的任务分配给2个不同的进程.
从Python控制台,我运行:
from example import getDigest
getDigest.delay('foo');getDigest.delay('bar')
Run Code Online (Sandbox Code Playgroud)
这会创建两个由两个工作人员同时执行的任务.问题是,当两个工作进程都运行它们的任务函数[ getDigest() ]时,它们似乎使用相同的哈希对象(md5).celeryd的输出确认了这一点,如下所示.
[PoolWorker-2] Using md5 -
[PoolWorker-2] <md5 HASH object @ 0x23e6870>
[PoolWorker-1] Using md5 -
[PoolWorker-1] <md5 HASH object @ 0x23e6870> …Run Code Online (Sandbox Code Playgroud) 有没有办法在Celery Broadcast任务中获得每个工作人员的所有结果?我想监控所有工人的一切是否正常.还将感谢任务发送到的工作者列表.
我正在运行一个Django网站并且刚刚让Celery运行,但是我遇到了令人困惑的错误.以下是代码的结构.
在tests.py中:
from tasks import *
from celery.result import AsyncResult
project = Project.objects.create()
# initalize various sub-objects of the project
c = function.delay(project.id)
r = AsyncResult(c.id).ready()
f = AsyncResult(c.id).failed()
# wait until the task is done
while not r and not f:
r = AsyncResult(c.id).ready()
f = AsyncResult(c.id).failed()
self.assertEqual() #will fail because task fails
Run Code Online (Sandbox Code Playgroud)
在tasks.py中:
from __future__ import absolute_import
from celery import shared_task
@shared_task
def function(project_id)
#a bunch of calculations followed by a save of the project
project = Project.objects.get(project=project_id) …Run Code Online (Sandbox Code Playgroud) 我有一个python celery-redis队列处理一次上传和下载值得一次演出和大量演出的数据。
很少有上传内容需要花费几个小时的时间。但是,一旦完成了这样的任务,我就目睹了这种奇怪的芹菜行为,即芹菜调度程序通过将其再次发送给工作人员(我正在运行一个工作人员)来重新运行刚刚完成的任务,并且它在同一时间发生了2次任务!
有人可以帮助我知道为什么会发生这种情况以及如何预防吗?
这些任务肯定干净地完成了,没有错误报告,因为这些任务运行时间非常长。
如何检查芹菜执行的功能?
def notification():
# in_celery() returns True if called from celery_test(),
# False if called from not_celery_test()
if in_celery():
# Send mail directly without creation of additional celery subtask
...
else:
# Send mail with creation of celery task
...
@celery.task()
def celery_test():
notification()
def not_celery_test():
notification()
Run Code Online (Sandbox Code Playgroud) 我有celery 4.1.0的django 1.11.5应用程序,我一直都收到了:
kombu.exceptions.EncodeError: <User: testuser> is not JSON serializable
Run Code Online (Sandbox Code Playgroud)
我的settings.py:
CELERY_BROKER_URL = 'amqp://localhost'
CELERY_RESULT_BACKEND = 'amqp://localhost'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Makassar'
CELERY_BEAT_SCHEDULE = {}
Run Code Online (Sandbox Code Playgroud)
task.py
from __future__ import absolute_import, unicode_literals
from celery import task
from django.contrib.auth.models import User
@task(serializer='json')
def task_number_one():
user = User.objects.create(username="testuser", email="test@test.com", password="pass")
return user
Run Code Online (Sandbox Code Playgroud)
在视图中我调用任务:
def form_valid(self, form):
form.instance.user = self.request.user
task_number_one.delay()
return super().form_valid(form)
Run Code Online (Sandbox Code Playgroud) celery ×10
celery-task ×10
python ×9
python-3.x ×4
django ×3
celeryd ×2
aioredis ×1
flask ×1
json ×1
redis ×1