标签: celery-task

Python celery - 如何等待和弦中的所有子任务

我正在对 celery 任务进行单元测试。我的连锁任务也有组,所以产生了一个和弦。

测试应该如下所示:

  • 运行 celery 任务(延迟)
  • 等待任务和所有子任务
  • 断言

我尝试了以下方法:

def wait_for_result(result):
    result.get()
    for child in result.children or list():
        if isinstance(child, GroupResult):
           # tried looping over task result in group
           # until tasks are ready, but without success 
           pass
        wait_for_result(child)
Run Code Online (Sandbox Code Playgroud)

这会造成死锁,chord_unlock 会永远重试。我对任务结果不感兴趣。如何等待所有子任务完成?

python chord celery celery-task

5
推荐指数
2
解决办法
1万
查看次数

Celery 击败了旧的(已删除的)任务

在 下supervisor,celerybeat 为我的 Django 应用程序的 celery 工作人员提供定期任务。我有 4 个任务,task1task2task3task4。最近我做了第五个任务:task5

我的问题是我从我的工作人员中注释掉了它task5,从 settings.py 中删除了它的提及并重新启动了 celerybeat 和我的 celery 工作人员。但我仍然看到task5定期出现(自然地在工人日志中抛出错误)。

为什么会发生这种情况,如何更新定期任务?


在 settings.py 中,我有:

    import djcelery
    djcelery.setup_loader()
    # config settings for Celery Daemon

    # Redis broker
    BROKER_URL = 'redis://localhost:6379/0'

    BROKER_TRANSPORT = 'redis'

    # List of modules to import when celery starts, in myapp.tasks form. 
    CELERY_IMPORTS = ('myapp.tasks', )  

    CELERY_ALWAYS_EAGER = False

    CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
    #The backend is the …
Run Code Online (Sandbox Code Playgroud)

django celery celery-task supervisord celerybeat

5
推荐指数
2
解决办法
4220
查看次数

使用python而不使用django在celery中执行周期性任务

大家好,我是 celery 和 python 的新手。我正在使用rabbitmq-server 创建一个简单的任务。但我不知道如何在 python 中使用 celerybeat 实现周期性任务。我搜索但每个地方我都会用 django 定期执行任务。

我使用此代码作为tasks.py: from celery import Celery from time import strftime

app = Celery('tasks',broker='pyamqp://guest@localhost//')

@app.task
def show_time():
    return strftime('%Y-%m-%d %H:%M:%S')
Run Code Online (Sandbox Code Playgroud)

运行任务.py:

from tasks import show_time
show_time.delay()
Run Code Online (Sandbox Code Playgroud)

谢谢你的时间。

python-2.7 celery-task celerybeat

5
推荐指数
1
解决办法
1372
查看次数

Celery:了解大局

Celery 似乎是一个很棒的工具,但我很难理解各个 Celery 组件如何协同工作:

  • 工人
  • 应用程序
  • 任务
  • 消息代理(如 RabbitMQ)

据我了解,命令行:

celery -A not-clear-what-this-option-is worker
Run Code Online (Sandbox Code Playgroud)

应该运行某种芹菜“工作服务器”,它本身需要连接到代理服务器(我不太确定为什么需要这么多服务器)。

然后在任何 python 代码中,可以通过实例化应用程序将某些任务发送给工作人员:

app = Celery('my_module', broker='pyamqp://guest@localhost//')
Run Code Online (Sandbox Code Playgroud)

然后通过以下方式使用此应用程序装饰函数:

@app.tasks
def my_func():
    ...
Run Code Online (Sandbox Code Playgroud)

这样“my_func()”现在可以被称为“my_func.delay()”以异步方式运行。

这是我的问题:

  • 调用 my_func.delay() 时会发生什么?哪个服务器首先与哪个服务器通信?并将什么发送到哪里?
  • celery 命令的“-A”后面的选项是什么?这真的需要吗?
  • 假设我有一个进程 X,它实例化一个 Celery 应用程序来启动任务 A,并且假设我有另一个进程 Y,它想知道 X 启动的任务 A 的状态。我认为 Y 有一种方法可以做到这一点,但是我不知道怎么办。我认为 Y 应该创建自己的 Celery 应用程序实例。但是之后:
    • 在 Y 的 celery 应用程序中调用什么函数来获取此信息(以及进程 Y 内任务 A 的“标识符”是什么)?
    • 就通信而言,它是如何工作的,即请求何时通过 Broker,何时到达工作人员?

如果有人有关于这些问题的一些信息,我将不胜感激。我打算在 Django 项目中使用 Celery,其中对服务器的一些请求可以触发各种耗时的任务,和/或查询先前启动的任务的状态(待处理、已完成、错误等)。

python celery celery-task django-celery

5
推荐指数
1
解决办法
1718
查看次数

Celery 在设置时间时击败不发送 crontab 任务

我正在使用 celery 4.1,除了我在 crontab 任务中设置时间的地方之外,所有定期任务都可以正常工作。我认为这与时区设置有关,但我似乎无法弄清楚问题出在哪里。

仪表板/celery.py

from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery('dashboard',
         broker='redis://',
         backend='redis://localhost',
         include=['dashboard.tasks'])

app.conf.update(
    result_expires=3600,
    enable_utc = False,
    timezone = 'America/New_York'
)


if __name__ == '__main__':
    app.start()
Run Code Online (Sandbox Code Playgroud)

这有效:

@app.task
@periodic_task(run_every=(crontab()))
def shutdown_vms():
    inst = C2CManage(['stop','kube'])
    inst.run()
    return
Run Code Online (Sandbox Code Playgroud)

这有效:

@app.task
@periodic_task(run_every=(crontab(minute=30,hour='*')))
def shutdown_vms():
    inst = C2CManage(['stop','kube'])
    inst.run()
    return
Run Code Online (Sandbox Code Playgroud)

这不起作用:

@app.task
@periodic_task(run_every=(crontab(minute=30,hour=6)))
def shutdown_vms():
    inst = C2CManage(['stop','kube'])
    inst.run()
    return
Run Code Online (Sandbox Code Playgroud)

Beat 很好地完成了任务:

<ScheduleEntry: dashboard.tasks.shutdown_vms dashboard.tasks.shutdown_vms() <crontab: 30 6 * * * (m/h/d/dM/MY)>
Run Code Online (Sandbox Code Playgroud)

但它从不发送它。我已经让进程运行了一个周末,但它从未提交任务。我不知道我做错了什么。我确实有其他按 timedelta 周期运行的任务,它们都运行得很好。 …

python celery celery-task

5
推荐指数
1
解决办法
6969
查看次数

如何在 celery 中测试 on_failure

我的芹菜任务有一个基类,其中on_failure实现了一个方法。

在我的测试中,我修补了任务调用的方法之一,以引发异常但从on_faliure未被调用。

基类

class BaseTask(celery.Task):
    abstract = True 

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print("error")
Run Code Online (Sandbox Code Playgroud)

任务

@celery.task(bind=True, base=BaseTask)
def multiple(self, a, b):
    logic.method(a, b)
Run Code Online (Sandbox Code Playgroud)

测试

@patch('tasks.logic.method')
def test_something(self, mock):
    # arrange
    mock.side_effect = NotImplementedError

    # act
    with self.assertRaises(NotImplementedError):
        multiple(1, 2)
Run Code Online (Sandbox Code Playgroud)

当运行 celery 并引发异常时,一切正常。 CELERY_ALWAYS_EAGER被激活。

我怎样才能on_faliure跑?

python unit-testing celery celery-task

5
推荐指数
1
解决办法
2667
查看次数

使用celery任务上传文件到s3

我正在尝试将视频文件上传到 s3,但在使用 celery 放入任务队列后。当视频上传时,用户可以做其他事情。

我的views.py调用celery任务

def upload_blob(request, iterator, interview_id, candidate_id, question_id):
    try:
        interview_obj = Interview.objects.get(id=interview_id)
    except ObjectDoesNotExist:
        interview_obj = None
    current_interview = interview_obj
    if request.method == 'POST':
        print("inside POST")
        # newdoc1 = Document(upload=request.FILES['uploaded_video'], name="videos/interview_"+interview_id+"_candidate_"+candidate_id+"_question_"+question_id)
        # newdoc1.save()
        save_document_model.delay(request.FILES['uploaded_video'],"videos/interview_"+interview_id+"_candidate_"+candidate_id+"_question_"+question_id)
        # newdoc2 = Document(upload=request.FILES['uploaded_audio'], name="audios/interview_"+interview_id+"_candidate_"+candidate_id+"_question_"+question_id)
        # newdoc2.save()
        save_document_model.delay(request.FILES['uploaded_audio'],"audios/interview_"+interview_id+"_candidate_"+candidate_id+"_question_"+question_id)
        iterator = str(int(iterator) + 1)

        return HttpResponseRedirect(reverse('candidate:show_question', kwargs={'iterator': iterator,'interview_id':current_interview.id,'question_id':question_id}))
    else:

        return render(request, 'candidate/record_answer.html')
Run Code Online (Sandbox Code Playgroud)

实际的芹菜任务.py

@task(name="save_document_model")
def save_document_model(uploaded_file, file_name):

    newdoc = Document(upload=uploaded_file, name=file_name)
    newdoc.save()

    logger.info("document saved successfully")
    return HttpResponse("document saved successfully")
Run Code Online (Sandbox Code Playgroud)

文档模型

def upload_function(instance, filename):
    getname …
Run Code Online (Sandbox Code Playgroud)

django file-upload amazon-s3 celery-task django-celery

5
推荐指数
1
解决办法
4622
查看次数

从 Asyncio 事件循环部署 Celery 任务

我有一个当前使用 asyncio 编写的后端应用程序:用于 Web 服务器的 fastapi,用于异步数据库驱动程序的 sqlalchemy 1.4 + asyncpg。我需要将任务部署给将运行和更新主机应用程序的工作人员。目前我正在使用aio_pika,但想要更强大的东西,例如celerywith flower

我知道 celery 没有与 asyncio 集成。我也读过这样的答案,我担心的是任务不是异步的,这是微不足道的。我担心从主事件循环内启动任务。

我的主要问题是my_task.delay()/是否完全my_task.apply_async()阻塞正在运行的线程?如果是这样,更好的方法是使用来自中央或 a的多处理工作人员,然后仅从该工作人员进程部署 celery 任务吗?getmp.QueueProcessPoolExecutor

我想要部署任务,并且最好在任务完成时收到通知。不过,这可以通过界面在任务本身内完成fastapi。我只是想确保部署任务不会阻止异步事件循环。

python asynchronous celery celery-task python-asyncio

5
推荐指数
1
解决办法
1774
查看次数

如何让 celery Worker 停止接收新任务 (Kubernetes)

因此,我们有一个 kubernetes 集群,运行一些带有 celery 工作线程的 pod。我们使用 python3.6 来运行这些工作程序,celery 版本是 3.1.2(我知道,真的很旧,我们正在努力升级它)。我们还设置了一些自动缩放机制来动态添加更多芹菜工作人员。

问题如下。假设在任何给定时间我们都有 5 个工人。然后会出现大量任务,从而增加 Pod 的 CPU/RAM 使用率。这会触发一个自动缩放事件,比方说,添加两个 celery 工作单元。所以现在这两个新的 celery 工人承担了一些长时间运行的任务。在完成运行这些任务之前,kubernetes 会创建一个缩减事件,杀死这两个工作人员,并杀死那些长时间运行的任务。

此外,由于遗留原因,如果任务未完成,我们没有重试机制(并且我们现在无法实现)。

所以我的问题是,有没有办法告诉 kubernetes 等待 celery 工作线程运行完所有待处理的任务?我想解决方案必须包括某种方法来通知芹菜工作人员以使其停止接收新任务。现在我知道 Kubernetes 有一些脚本可以处理这种情况,但我不知道在这些脚本上写什么,因为我不知道如何让 celery Worker 停止接收任务。

任何想法?

celery celery-task python-3.x django-celery kubernetes

5
推荐指数
1
解决办法
2316
查看次数

Docker 中的 Celery kombu.exceptions.ContentDisallowed

我正在使用 celery 和 fastAPI。

获取无法解码消息正文:ContentDisallowed('拒绝反序列化 json 类型的不受信任内容 (application/json)')在 docker 中运行时,在没有 docker 的本地计算机上运行相同的内容时,没有问题。

其配置如下。

celery_app = Celery('cda-celery-tasks',
                    broker=CFG.BROKER_URL,
                    backend=CFG.BACKEND_URL,
                    include=['src.tasks.tasks']
                    )

celery_app.conf.task_serializer = 'pickle'
celery_app.conf.result_serializer = 'pickle'
celery_app.conf.accept_content = ['pickle']
celery_app.conf.enable_utc = True

Run Code Online (Sandbox Code Playgroud)

在 docker 中运行时,我不断收到错误

FROM python:3.8
WORKDIR /app

COPY . .

RUN pip3 install poetry
ENV PATH="/root/.poetry/bin:$PATH"

RUN poetry install
Run Code Online (Sandbox Code Playgroud)

使用 kubernetes 中的以下命令启动 celery。

poetry run celery -A src.infrastructure.celery_application worker --loglevel=INFO --concurrency 2

运行时我不断收到错误

无法解码消息正文:ContentDisallowed('拒绝反序列化 json 类型的不受信任内容 (application/json)')

body: '{"method": "enable_events", "arguments": {}, "destination": null, "pattern": null, …
Run Code Online (Sandbox Code Playgroud)

python celery celery-task docker kubernetes

5
推荐指数
1
解决办法
1259
查看次数