标签: django-celery

在 Rabbitmq 和 celery 中设置 Autoack true

我正在使用celery和rabbitmq,但是由于在队列中推送了多个任务,我的服务器内存利用率变得超过40%,因此rabbit将不再接受任何任务。所以我想删除那些已经执行的消息,但是由于rabbitmq的持久行为,这些消息不会自动删除,所以我想设置一些配置,例如 autoAck=True ,这样如果从 celery 消耗消息,它将从rabbitmq 队列以及我的服务器内存。请解释一下我们该如何做到这一点。

rabbitmq celery django-celery

1
推荐指数
1
解决办法
7035
查看次数

Celery worker ImportError:没有名为“项目”的模块

当我尝试启动工作程序时,出现一个问题:
ImportError:没有名为“ project”的模块

追溯(最近一次通话):

  File "/usr/local/bin/celery", line 11, in <module>
    sys.exit(main())
  File "/usr/local/lib/python3.5/dist-packages/celery/__main__.py", line 16, in main
    _main()
  File "/usr/local/lib/python3.5/dist-packages/celery/bin/celery.py", line 322, in main
    cmd.execute_from_commandline(argv)
  File "/usr/local/lib/python3.5/dist-packages/celery/bin/celery.py", line 496, in execute_from_commandline
    super(CeleryCommand, self).execute_from_commandline(argv)))
  File "/usr/local/lib/python3.5/dist-packages/celery/bin/base.py", line 273, in execute_from_commandline
    argv = self.setup_app_from_commandline(argv)
  File "/usr/local/lib/python3.5/dist-packages/celery/bin/base.py", line 479, in setup_app_from_commandline
    self.app = self.find_app(app)
  File "/usr/local/lib/python3.5/dist-packages/celery/bin/base.py", line 501, in find_app
    return find_app(app, symbol_by_name=self.symbol_by_name)
  File "/usr/local/lib/python3.5/dist-packages/celery/app/utils.py", line 359, in find_app
    sym = symbol_by_name(app, imp=imp)
  File "/usr/local/lib/python3.5/dist-packages/celery/bin/base.py", line 504, in symbol_by_name
    return imports.symbol_by_name(name, imp=imp)
  File …
Run Code Online (Sandbox Code Playgroud)

python django celery celery-task django-celery

1
推荐指数
1
解决办法
1524
查看次数

我们如何完成 celery period_task 中的停止任务?

撤销 @periodic_task 发送的任务Discarding revoked tasks & Due task to workers.

芹菜工人屏幕截图

[2018-09-17 12:23:50,864: INFO/MainProcess] Received task: cimexapp.tasks.add[xxxxxxx]
[2018-09-17 12:23:50,864: INFO/MainProcess] Discarding revoked task: cimexapp.tasks.add[xxxxxxx] [2018-09-17 12:24:00,865: INFO/Beat] Scheduler: Sending due task cimexapp.tasks.add (cimexapp.tasks.add) [2018-09-17 12:24:00,869: INFO/MainProcess] Received task: cimexapp.tasks.add[xxxxxxx]
[2018-09-17 12:24:00,869: INFO/MainProcess] Discarding revoked task: cimexapp.tasks.add[xxxxxxx] [2018-09-17 12:24:10,865: INFO/Beat] Scheduler: Sending due task cimexapp.tasks.add (cimexapp.tasks.add) [2018-09-17 12:24:10,868: INFO/MainProcess] Received task: cimexapp.tasks.add[xxxxxxx]
[2018-09-17 12:24:10,869: INFO/MainProcess] Discarding revoked task: cimexapp.tasks.add[xxxxxxx]


任务.py

@periodic_task(run_every=timedelta(seconds=10),options={"task_id":"xxxxxxx"})
def add():
     call(["ping","-c10","google.com"])


def stop():
    x = revoke("xxxxxxx",terminate=True,signal="KILL")
    print(x) …
Run Code Online (Sandbox Code Playgroud)

django celery celery-task django-celery celerybeat

1
推荐指数
1
解决办法
6500
查看次数

如何禁用 celery 任务结果记录?

我有正在运行的 Celery 3.1 应用程序,它记录了一些敏感信息。理想情况下,我希望有相同的日志,但没有结果部分。

目前它看起来像:

worker_1 | [2019-12-10 13:46:40,052: INFO/MainProcess] Task xxxxx succeeded in 13.19569299298746s: yyyyyyy
Run Code Online (Sandbox Code Playgroud)

我想拥有:

worker_1 | [2019-12-10 13:46:40,052: INFO/MainProcess] Task xxxxx succeeded in 13.19569299298746s
Run Code Online (Sandbox Code Playgroud)

怎么做?

编辑:看来这可以完成这项工作:https://docs.celeryproject.org/en/3.1/reference/celery.worker.job.html#celery.worker.job.Request.success_msg但我不知道如何来实际使用它。

celery celery-task django-celery

1
推荐指数
1
解决办法
3297
查看次数

如何将 Django Celery 应用程序从 Elastic Beanstalk Amazon Linux 1 升级到 Amazon Linux 2

我正在尝试将运行 Celery 的 Django 应用程序从运行 Python 3.6 的 Amazon Elastic Beanstalk Linux 1 升级到使用 Python 3.8 的 Amazon Linux 2。

我在使用 Celery 应用程序时遇到问题。

在Linux 1中我有以下文件

#!/usr/bin/env bash

# Get django environment variables
celeryenv=`cat /opt/python/current/env | tr '\n' ',' | sed 's/export //g' | sed 's/$PATH/%(ENV_PATH)s/g' | sed 's/$PYTHONPATH//g' | sed 's/$LD_LIBRARY_PATH//g'`
celeryenv=${celeryenv%?}

# Create celery configuraiton script
celeryconf="[program:celeryd-worker]
; Set full path to celery program if using virtualenv
command=/opt/python/run/venv/bin/celery -A core worker -P solo --loglevel=INFO -n worker.%%h

directory=/opt/python/current/app/src
user=nobody
numprocs=1 …
Run Code Online (Sandbox Code Playgroud)

python linux django amazon-web-services django-celery

1
推荐指数
1
解决办法
783
查看次数

Django Celery Task 如何知道它运行了?

我对 Django Celery 有以下任务(使用 Amazon SQS)。

@task
def upload_task(request, **kwargs):
    file = request.FILES['file']
    ContactCSVModel.import_from_file(file)
    return True
Run Code Online (Sandbox Code Playgroud)

这似乎有效,即文件已成功添加到我的数据库中,但我不确定它是否使用 Celery。我怎么知道这是否有效。我应该看点什么吗?在终端我跑了...

manage.py celery worker --loglevel=info,它指出我有一个名为 的任务contacts.tasks.upload_task,但我从未看到任何事情发生,它只是指出...

[2013-03-14 20:52:47,947:INFO/MainProcess]消费者:连接到 sqs://AJSUQJZKNSJA81JM@localhost//

知道我的任务是否已运行并完成(是的,我知道它已完成,因为它在数据库中,但是通过 Celery 实现的吗?)

这是任务运行的地方

视图.py

@login_required
def upload(request):
    # If we had a POST then get the request post values.
    if request.method == 'POST':
        form = ContactUploadForm(request.POST, request.FILES)
        # Check we have valid data
        if form.is_valid():
            upload_task(request)
            #file = request.FILES['file']
            #ContactCSVModel.import_from_file(file)
            messages.add_message(request, messages.SUCCESS, 'Items have been added')
        else:
            messages.add_message(request, messages.ERROR, ' …
Run Code Online (Sandbox Code Playgroud)

django django-celery

0
推荐指数
1
解决办法
879
查看次数

芹菜击败args:列表与元组

芹菜文档描述了如何通过位置参数,以您的节拍计划任务列表或元组.

我有一个任务,只需一个参数,一个整数列表:

@shared_task
def schedule_by_ids(ids):
  ...
Run Code Online (Sandbox Code Playgroud)

我的celerybeat时间表如下:

CELERYBEAT_SCHEDULE = {
    'schedule_by_ids': {
        'task': 'myproj.app.tasks.schedule_by_ids',
        'schedule': crontab(minute='*/10', hour='8-21'),
        'args': ([1,]),
    },
}
Run Code Online (Sandbox Code Playgroud)

我的任务失败,"int不可迭代" TypeError.根据我的显示器(芹菜花),args传递为[1].

当我将args作为列表时,例如[[1]],arg显示在监视器中,[[1]]并且它工作正常.

我的问题是:当它是一个元组时,它是如何通过args的?为什么?

python celery django-celery celerybeat

0
推荐指数
1
解决办法
1449
查看次数

使用celery将django模型中的保存方法重写为异步的最佳实践

我正在建立一个云系统,我有两个应用程序,包括完整功能的服务器应用程序,以及仅包含输入法的客户端应用程序,所以我在客户分支中安装客户端应用程序作为本地应用程序,

我希望在本地保存模型后覆盖应用程序中的任何模型,我将调用芹菜任务将此模型添加到队列中以确保它将到达,即使互联网已关闭,我将重试直到互联网起步,

现在我希望最佳实践能够以通用的方式对任何模型进行操作

我有两个选择

1-这样的覆盖保存方法

def save(self, *args, **kwargs):
    super(Model, self).save(*args, **kwargs)
    save_task.delay(self)
Run Code Online (Sandbox Code Playgroud)

或使用这样的信号

post_save.connect(save-task.delay, sender=Model)
Run Code Online (Sandbox Code Playgroud)

哪一个是最佳实践,我可以为这个项目的所有模型制作泛型?

django message-queue django-models celery django-celery

0
推荐指数
1
解决办法
1264
查看次数

为 Django 项目中的可重用应用程序创建基于类的 Celery 任务

对于 Django 项目来说,创建基于函数的任务非常干净。只需在 django 应用程序中创建tasks.py并开始编写任务,就像这个示例一样,该示例取自官方celery文档:http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html

from __future__ import absolute_import, unicode_literals
from celery import shared_task

@shared_task
def add(x, y):
    return x + y


@shared_task
def mul(x, y):
    return x * y
Run Code Online (Sandbox Code Playgroud)

但有时基于函数的任务是紧密耦合的并且可重用性不高。所以我想创建基于类的芹菜任务,该任务记录在官方网站中。在遵循https://github.com/celery/celery/issues/3874之后,我可以创建示例任务,但我不确定它是否是创建基于类的任务的正确方法。

from __future__ import absolute_import, unicode_literals
from celery import shared_task, Task
import time
from celery import current_app


@shared_task
def add(x, y):
    time.sleep(5)
    return x + y


@shared_task
def mul(x, y):
    return x * y

# Sample class based task for testing
class AnotherTask(current_app.Task):
    name = 'tasks.another_task'
    def …
Run Code Online (Sandbox Code Playgroud)

python django task celery django-celery

0
推荐指数
1
解决办法
5948
查看次数

我们如何在 Django 中创建异步 API?

我想创建一个第三方聊天机器人 API,它是异步的,并在 10 秒暂停后回复“ok”。

import time

def wait():
    time.sleep(10)
    return "ok"

# views.py
def api(request):
    return wait()
Run Code Online (Sandbox Code Playgroud)

我已经尝试了 celery,如下所示,我正在等待 celery 响应:

import time
from celery import shared_task

@shared_task
def wait():
    time.sleep(10)
    return "ok"

# views.py
def api(request):
    a = wait.delay()
    work = AsyncResult(a.id)
    while True:
        if work.ready():
           return work.get(timeout=1)
Run Code Online (Sandbox Code Playgroud)

但是这个解决方案是同步工作的,没有区别。我们如何在不要求用户继续请求直到收到结果的情况下使其异步?

python django asynchronous celery django-celery

0
推荐指数
1
解决办法
5058
查看次数

Celery/Django/Redis 同一任务被执行多次

在我当前的项目中,我需要做的是从 700 多个端点获取数据,然后将该数据发送到另外 700 多个端点。我的方法是使用 Django Celery Redis,在每个工作人员上放置 70 多个端点,以便有大约 10 个工作人员将检索数据然后发布数据。

为此,我使用 Chord 执行并行任务,然后计算所需的时间。

问题是 Celery 多次运行相同的任务。task_get_data是主要方法,首先获取网站列表,然后将其分成每组 70 个,然后task_post_data使用 Chord 调用。

在下面的输出中website_A,您可以website_B多次看到 等,我已经手动检查了我的数据和所有内容,并且没有重复的网站,但是当提交 celery 任务时,会创建多个条目。

另外,有没有办法监控工人的数量以及他们正在处理什么?

下面是代码

os.environ.setdefault('DJANGO_SETTINGS_MODULE','django_backend.settings')

app = Celery('django_backend', backend='redis://localhost:6379', broker='redis://localhost:6379')

app.config_from_object('django.conf:settings', namespace='CELERY')
# app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

def post_data(json_obj, website):
    for items in json_obj:
        md = md + items['data']
    n = 50
    list_of_objects = [md[i:i+n] for i in range(0, len(md), n)]
    
    print("Starting to post …
Run Code Online (Sandbox Code Playgroud)

python django celery django-celery

0
推荐指数
1
解决办法
2175
查看次数