我正在对 celery 任务进行单元测试。我的连锁任务也有组,所以产生了一个和弦。
测试应该如下所示:
我尝试了以下方法:
def wait_for_result(result):
result.get()
for child in result.children or list():
if isinstance(child, GroupResult):
# tried looping over task result in group
# until tasks are ready, but without success
pass
wait_for_result(child)
Run Code Online (Sandbox Code Playgroud)
这会造成死锁,chord_unlock 会永远重试。我对任务结果不感兴趣。如何等待所有子任务完成?
在 下supervisor,celerybeat 为我的 Django 应用程序的 celery 工作人员提供定期任务。我有 4 个任务,task1、task2、task3和task4。最近我做了第五个任务:task5。
我的问题是我从我的工作人员中注释掉了它task5,从 settings.py 中删除了它的提及并重新启动了 celerybeat 和我的 celery 工作人员。但我仍然看到task5定期出现(自然地在工人日志中抛出错误)。
为什么会发生这种情况,如何更新定期任务?
在 settings.py 中,我有:
import djcelery
djcelery.setup_loader()
# config settings for Celery Daemon
# Redis broker
BROKER_URL = 'redis://localhost:6379/0'
BROKER_TRANSPORT = 'redis'
# List of modules to import when celery starts, in myapp.tasks form.
CELERY_IMPORTS = ('myapp.tasks', )
CELERY_ALWAYS_EAGER = False
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
#The backend is the …Run Code Online (Sandbox Code Playgroud) 大家好,我是 celery 和 python 的新手。我正在使用rabbitmq-server 创建一个简单的任务。但我不知道如何在 python 中使用 celerybeat 实现周期性任务。我搜索但每个地方我都会用 django 定期执行任务。
我使用此代码作为tasks.py: from celery import Celery from time import strftime
app = Celery('tasks',broker='pyamqp://guest@localhost//')
@app.task
def show_time():
return strftime('%Y-%m-%d %H:%M:%S')
Run Code Online (Sandbox Code Playgroud)
运行任务.py:
from tasks import show_time
show_time.delay()
Run Code Online (Sandbox Code Playgroud)
谢谢你的时间。
Celery 似乎是一个很棒的工具,但我很难理解各个 Celery 组件如何协同工作:
据我了解,命令行:
celery -A not-clear-what-this-option-is worker
Run Code Online (Sandbox Code Playgroud)
应该运行某种芹菜“工作服务器”,它本身需要连接到代理服务器(我不太确定为什么需要这么多服务器)。
然后在任何 python 代码中,可以通过实例化应用程序将某些任务发送给工作人员:
app = Celery('my_module', broker='pyamqp://guest@localhost//')
Run Code Online (Sandbox Code Playgroud)
然后通过以下方式使用此应用程序装饰函数:
@app.tasks
def my_func():
...
Run Code Online (Sandbox Code Playgroud)
这样“my_func()”现在可以被称为“my_func.delay()”以异步方式运行。
这是我的问题:
如果有人有关于这些问题的一些信息,我将不胜感激。我打算在 Django 项目中使用 Celery,其中对服务器的一些请求可以触发各种耗时的任务,和/或查询先前启动的任务的状态(待处理、已完成、错误等)。
我正在使用 celery 4.1,除了我在 crontab 任务中设置时间的地方之外,所有定期任务都可以正常工作。我认为这与时区设置有关,但我似乎无法弄清楚问题出在哪里。
仪表板/celery.py
from __future__ import absolute_import, unicode_literals
from celery import Celery
app = Celery('dashboard',
broker='redis://',
backend='redis://localhost',
include=['dashboard.tasks'])
app.conf.update(
result_expires=3600,
enable_utc = False,
timezone = 'America/New_York'
)
if __name__ == '__main__':
app.start()
Run Code Online (Sandbox Code Playgroud)
这有效:
@app.task
@periodic_task(run_every=(crontab()))
def shutdown_vms():
inst = C2CManage(['stop','kube'])
inst.run()
return
Run Code Online (Sandbox Code Playgroud)
这有效:
@app.task
@periodic_task(run_every=(crontab(minute=30,hour='*')))
def shutdown_vms():
inst = C2CManage(['stop','kube'])
inst.run()
return
Run Code Online (Sandbox Code Playgroud)
这不起作用:
@app.task
@periodic_task(run_every=(crontab(minute=30,hour=6)))
def shutdown_vms():
inst = C2CManage(['stop','kube'])
inst.run()
return
Run Code Online (Sandbox Code Playgroud)
Beat 很好地完成了任务:
<ScheduleEntry: dashboard.tasks.shutdown_vms dashboard.tasks.shutdown_vms() <crontab: 30 6 * * * (m/h/d/dM/MY)>
Run Code Online (Sandbox Code Playgroud)
但它从不发送它。我已经让进程运行了一个周末,但它从未提交任务。我不知道我做错了什么。我确实有其他按 timedelta 周期运行的任务,它们都运行得很好。 …
我的芹菜任务有一个基类,其中on_failure实现了一个方法。
在我的测试中,我修补了任务调用的方法之一,以引发异常但从on_faliure未被调用。
基类
class BaseTask(celery.Task):
abstract = True
def on_failure(self, exc, task_id, args, kwargs, einfo):
print("error")
Run Code Online (Sandbox Code Playgroud)
任务
@celery.task(bind=True, base=BaseTask)
def multiple(self, a, b):
logic.method(a, b)
Run Code Online (Sandbox Code Playgroud)
测试
@patch('tasks.logic.method')
def test_something(self, mock):
# arrange
mock.side_effect = NotImplementedError
# act
with self.assertRaises(NotImplementedError):
multiple(1, 2)
Run Code Online (Sandbox Code Playgroud)
当运行 celery 并引发异常时,一切正常。
CELERY_ALWAYS_EAGER被激活。
我怎样才能on_faliure跑?
我正在尝试将视频文件上传到 s3,但在使用 celery 放入任务队列后。当视频上传时,用户可以做其他事情。
我的views.py调用celery任务
def upload_blob(request, iterator, interview_id, candidate_id, question_id):
try:
interview_obj = Interview.objects.get(id=interview_id)
except ObjectDoesNotExist:
interview_obj = None
current_interview = interview_obj
if request.method == 'POST':
print("inside POST")
# newdoc1 = Document(upload=request.FILES['uploaded_video'], name="videos/interview_"+interview_id+"_candidate_"+candidate_id+"_question_"+question_id)
# newdoc1.save()
save_document_model.delay(request.FILES['uploaded_video'],"videos/interview_"+interview_id+"_candidate_"+candidate_id+"_question_"+question_id)
# newdoc2 = Document(upload=request.FILES['uploaded_audio'], name="audios/interview_"+interview_id+"_candidate_"+candidate_id+"_question_"+question_id)
# newdoc2.save()
save_document_model.delay(request.FILES['uploaded_audio'],"audios/interview_"+interview_id+"_candidate_"+candidate_id+"_question_"+question_id)
iterator = str(int(iterator) + 1)
return HttpResponseRedirect(reverse('candidate:show_question', kwargs={'iterator': iterator,'interview_id':current_interview.id,'question_id':question_id}))
else:
return render(request, 'candidate/record_answer.html')
Run Code Online (Sandbox Code Playgroud)
实际的芹菜任务.py
@task(name="save_document_model")
def save_document_model(uploaded_file, file_name):
newdoc = Document(upload=uploaded_file, name=file_name)
newdoc.save()
logger.info("document saved successfully")
return HttpResponse("document saved successfully")
Run Code Online (Sandbox Code Playgroud)
文档模型
def upload_function(instance, filename):
getname …Run Code Online (Sandbox Code Playgroud) 我有一个当前使用 asyncio 编写的后端应用程序:用于 Web 服务器的 fastapi,用于异步数据库驱动程序的 sqlalchemy 1.4 + asyncpg。我需要将任务部署给将运行和更新主机应用程序的工作人员。目前我正在使用aio_pika,但想要更强大的东西,例如celerywith flower。
我知道 celery 没有与 asyncio 集成。我也读过这样的答案,我担心的是任务不是异步的,这是微不足道的。我担心从主事件循环内启动任务。
我的主要问题是my_task.delay()/是否完全my_task.apply_async()阻塞正在运行的线程?如果是这样,更好的方法是使用来自中央或 a的多处理工作人员,然后仅从该工作人员进程部署 celery 任务吗?getmp.QueueProcessPoolExecutor
我想要部署任务,并且最好在任务完成时收到通知。不过,这可以通过界面在任务本身内完成fastapi。我只是想确保部署任务不会阻止异步事件循环。
因此,我们有一个 kubernetes 集群,运行一些带有 celery 工作线程的 pod。我们使用 python3.6 来运行这些工作程序,celery 版本是 3.1.2(我知道,真的很旧,我们正在努力升级它)。我们还设置了一些自动缩放机制来动态添加更多芹菜工作人员。
问题如下。假设在任何给定时间我们都有 5 个工人。然后会出现大量任务,从而增加 Pod 的 CPU/RAM 使用率。这会触发一个自动缩放事件,比方说,添加两个 celery 工作单元。所以现在这两个新的 celery 工人承担了一些长时间运行的任务。在完成运行这些任务之前,kubernetes 会创建一个缩减事件,杀死这两个工作人员,并杀死那些长时间运行的任务。
此外,由于遗留原因,如果任务未完成,我们没有重试机制(并且我们现在无法实现)。
所以我的问题是,有没有办法告诉 kubernetes 等待 celery 工作线程运行完所有待处理的任务?我想解决方案必须包括某种方法来通知芹菜工作人员以使其停止接收新任务。现在我知道 Kubernetes 有一些脚本可以处理这种情况,但我不知道在这些脚本上写什么,因为我不知道如何让 celery Worker 停止接收任务。
任何想法?
我正在使用 celery 和 fastAPI。
获取无法解码消息正文:ContentDisallowed('拒绝反序列化 json 类型的不受信任内容 (application/json)')在 docker 中运行时,在没有 docker 的本地计算机上运行相同的内容时,没有问题。
其配置如下。
celery_app = Celery('cda-celery-tasks',
broker=CFG.BROKER_URL,
backend=CFG.BACKEND_URL,
include=['src.tasks.tasks']
)
celery_app.conf.task_serializer = 'pickle'
celery_app.conf.result_serializer = 'pickle'
celery_app.conf.accept_content = ['pickle']
celery_app.conf.enable_utc = True
Run Code Online (Sandbox Code Playgroud)
在 docker 中运行时,我不断收到错误
FROM python:3.8
WORKDIR /app
COPY . .
RUN pip3 install poetry
ENV PATH="/root/.poetry/bin:$PATH"
RUN poetry install
Run Code Online (Sandbox Code Playgroud)
使用 kubernetes 中的以下命令启动 celery。
poetry run celery -A src.infrastructure.celery_application worker --loglevel=INFO --concurrency 2
运行时我不断收到错误
无法解码消息正文:ContentDisallowed('拒绝反序列化 json 类型的不受信任内容 (application/json)')
body: '{"method": "enable_events", "arguments": {}, "destination": null, "pattern": null, …Run Code Online (Sandbox Code Playgroud) celery-task ×10
celery ×8
python ×6
celerybeat ×2
django ×2
kubernetes ×2
amazon-s3 ×1
asynchronous ×1
chord ×1
docker ×1
file-upload ×1
python-2.7 ×1
python-3.x ×1
supervisord ×1
unit-testing ×1