标签: django-celery

如果我在Django中运行多个站点,我是否需要执行单独的celeryd

目前我在Django上有一个网站.但我打算运行更多的Django网站.

所以我想知道我是否需要celeryd为每个新网站运行或者一个就足够了.

我通过主管运行`celeryd守护进程

django celery django-celery

2
推荐指数
1
解决办法
278
查看次数

Celery 中的阻塞版本延迟?

我知道这违背了使用 Celery 的全部目的,但是是否有一个函数会阻塞直到结果返回?

因此MyTask.delay(some_arg="foo"),我可以调用actual_result = MyTask.dont_delay(some_arg="foo")它将阻塞并返回结果,而不是我必须四处走动并获取结果?

celery django-celery

2
推荐指数
1
解决办法
2290
查看次数

芹菜击败 - 每个任务的不同时区

我正在使用芹菜殴打来安排一些任务.我可以使用CELERY_TIMEZONE设置来使用crontab计划安排任务,并在上述时区的预定时间运行.

但我希望能够在同一个应用程序中为不同的时区设置多个此类任务(单个django settings.py).我知道在安排任务时需要在哪个时区运行哪个任务.

是否可以为每个任务指定不同的时区?

我正在使用django(1.4)和芹菜(3.0.11)和django芹菜(3.0.11).

我看过这个djcelery.schedulers.DatabaseScheduler类和它的基类,但我无法弄清楚时区的使用方式和位置.我可以编写一个自定义调度程序,可以使每个作业在不同的时区运行吗?

谢谢,

python django django-celery celerybeat

2
推荐指数
2
解决办法
4229
查看次数

芹菜:CELERYD_CONCURRENCY和工人数量

另一个stackoverflow回答,我试图限制芹菜的工人数量

在我终止所有芹菜工人后,我用新的配置重新启动了芹菜.

CELERYD_CONCURRENCY = 1(在Django的settings.py中)

然后我输入以下命令来检查有多少芹菜工人正在工作.

ps auxww | grep 'celery worker' | grep -v grep | awk '{print $2}'

它返回两个PID,如24803,24817.

然后我改变配置CELERYD_CONCURRENCY = 2并重新启动芹菜.

相同的命令返回三个PID,如24944,24958,24959.(如您所见,最后两个PID是顺序的)

这意味着按照我的预期增加了工人数量.

但是,我不知道为什么它会返回两个PID,即使只有一个芹菜工人正在工作?

是否有一些辅助过程来帮助工人?

worker celery django-celery

2
推荐指数
1
解决办法
1839
查看次数

Celery“收到未注册的任务类型”

我已经阅读了很多与此类似的帖子,但对我来说似乎没有任何意义。

我正在尝试将 Celery PeriodicTask 配置为每 5 秒触发一次,但我被 Celery 配置问题所困扰(我认为)

通讯/任务.py

import datetime
from celery.decorators import periodic_task

@periodic_task
def send_queued_messages():
    # do something...
Run Code Online (Sandbox Code Playgroud)

我的应用程序/settings.py

...
from comm.tasks import send_queued_messages
from datetime import timedelta
CELERYBEAT_SCHEDULE = {
    'send_queued_messages_every_5_seconds': {
        'task': 'comm.tasks.send_queued_messages',   # Is the issue here?  I've tried a dozen variations!!
        'schedule': timedelta(seconds=5),
        },
    }
Run Code Online (Sandbox Code Playgroud)

我的错误日志的相关输出:

23:41:00 worker.1 | [2015-06-10 03:41:00,657: ERROR/MainProcess] Received unregistered task of type 'send_queued_messages'.
23:41:00 worker.1 | The message has been ignored and discarded.
23:41:00 worker.1 | 
23:41:00 …
Run Code Online (Sandbox Code Playgroud)

python celery django-celery celerybeat

2
推荐指数
1
解决办法
2万
查看次数

django celery 获取类型错误:“元组”对象不可调用

我刚刚使用 RabbitMQ 在我的本地机器 (win10) 上安装了 celery 并遵循了他们的初学者教程。但我收到错误消息TypeError: 'tuple' object is not callable

src>workon clicknstrip
(clicknstrip) src>python manage.py celery beat

celery beat v3.1.18 (Cipater) is starting.
__    -    ... __   -        _
Configuration ->
    . broker -> amqp://guest:**@localhost:5672//
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> celery.beat.PersistentScheduler
    . db -> celerybeat-schedule
    . logfile -> [stderr]@%INFO
    . maxinterval -> now (0s)
[2015-08-13 10:01:13,441: INFO/MainProcess] beat: Starting...
[2015-08-13 10:01:13,466: WARNING/MainProcess] DB Reset: Account for new __version__ field
[2015-08-13 10:01:13,470: CRITICAL/MainProcess] beat …
Run Code Online (Sandbox Code Playgroud)

django celery django-celery

2
推荐指数
1
解决办法
1280
查看次数

TypeError Celery 即使参数正确

我正在使用 Celery 和 Redis。我的 tasks.py 文件中有以下代码:

from celery import Celery
from faker import Factory
fake = Factory.create()

app = Celery("tasks")
app.conf.broker_url = 'redis://localhost:6379/0'
app.conf.result_backend = 'redis://localhost:6379/0'

@app.task
def twitterDP(hashtag):
    if hashtag:
        return ["From Twitter " + fake.text(20) + " hashtag # " + hashtag for x in range(5)]
    return []
Run Code Online (Sandbox Code Playgroud)

为了运行任务,我还有另一个包含以下代码的脚本:

import zmq
from tasks import twitterDP
from celery.result import AsyncResult

import time

class WorkFlow(object):
    def __init__(self):
        self.ctx = zmq.Context()
        self.socket_pull = self.ctx.socket(zmq.PULL)
        self.socket_pull.bind("tcp://127.0.0.1:5860")

    def do_work(self):
        while True:
            recv_msg = …
Run Code Online (Sandbox Code Playgroud)

python celery django-celery

2
推荐指数
1
解决办法
2001
查看次数

django-celery 作为 virtualenv 中的 systemd 服务

这是否可以将 django-celery 守护进程作为 virtualenv 中项目的 systemd 服务?

这是我的配置:

/etc/systemd/system/celery.service

[Unit]
Description=Celery Service
After=network.target

[Service]
Type=forking
User=vagrant
Group=vagrant
Restart=no
WorkingDirectory=/vagrant/myproj/
ExecStart=/bin/sh -c '/var/www/vhost/myproj_env/bin/python \ 
   /vagrant/myproj/manage.py celery worker \ 
   --loglevel=DEBUG \
   --logfile=/var/log/celery/worker.log \ 
   --pidfile=/var/run/celery/worker.pid \
   -Q availability,celery --time-limit=300'
ExecStop=/bin/sh -c '/var/www/vhost/myproj_env/bin/python \ 
    /vagrant/myproj/manage.py celery stop \
   --pidfile=/var/run/celery/worker.pid'

[Install]
WantedBy=multi-user.target
Run Code Online (Sandbox Code Playgroud)

这里提到的所有目录确实存在,并且权限设置正确

settings.py 中的 django-celery:

INSTALLED_APPS = (
    ...
    'djcelery',
    'celery_haystack',
    ...
)

import djcelery
djcelery.setup_loader()

CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler"
BROKER_URL = "redis://localhost:6379/0"
CELERY_RESULT_BACKEND = BROKER_URL
CELERY_REDIS_MAX_CONNECTIONS = 30
Run Code Online (Sandbox Code Playgroud)

以下命令正常启动celery,可以看到正在执行的任务:

python manage.py celery worker --loglevel=INFO …
Run Code Online (Sandbox Code Playgroud)

python django celery django-celery systemd

2
推荐指数
1
解决办法
4620
查看次数

芹菜定期任务未在 Django 中运行

文件结构

proj/proj/
         celery.py
         (and other files)
    /sitesettings/
         tasks.py
         (and other files)
Run Code Online (Sandbox Code Playgroud)

芹菜.py

app = Celery('mooncake',broker_url = 'amqp://')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
Run Code Online (Sandbox Code Playgroud)

站点设置/任务.py

from __future__ import absolute_import, unicode_literals
from comma.models import Post
from mooncake.celery import app

app.conf.beat_schedule = {
'every-5-seconds': {
    'task': 'sitesettings.tasks.statisticsTag',
    'schedule': 5.0,
    'args': ()
},
}

@app.task
def statisticsTag():
    print(Post.objects.all()[0])
Run Code Online (Sandbox Code Playgroud)

并运行它

celery -A proj beat -l info
Run Code Online (Sandbox Code Playgroud)

把它拿出来

[2019-02-22 18:21:08,346: INFO/MainProcess] Scheduler: Sending due task every-5-seconds (sitesettings.tasks.statisticsTag)
Run Code Online (Sandbox Code Playgroud)

但没有进一步的输出。我曾经尝试在 proj/celery.py 中编写它,但它无法运行,因为我必须从另一个应用程序导入,它以“应用程序未加载”错误退出。所以我该怎么做?

python django celery django-celery celerybeat

2
推荐指数
1
解决办法
2188
查看次数

在 Celery 任务中获取 Django 频道访问权限

我有一个 Django 应用程序,它使用通道来监视 WebSocket 以启动 Celery 中的后端任务。它当前睡眠给定的数量,然后返回 true。

问题是我不知道如何从 celery 任务中访问 WebSocket,所以我可以在完成后通知 UI。

celery==4.3.0
channels==2.2.0
Django==2.2.4
django-celery-results==1.1.2
djangorestframework==3.10.2
Run Code Online (Sandbox Code Playgroud)

我的任务.py

from __future__ import absolute_import, unicode_literals
from celery import shared_task
import time

@shared_task
def gotosleep(timeInSecs):
    time.sleep(timeInSecs)
    return True
Run Code Online (Sandbox Code Playgroud)

我的消费者.py

from channels.generic.websocket import WebsocketConsumer
import json
from access.tasks import gotosleep

class AccessConsumer(WebsocketConsumer):
    def connect(self):
        self.accept()


    def disconnect(self, close_code):
        pass

    def receive(self, text_data):
       text_data_json = json.loads(text_data)
       message = text_data_json['message']
        if message.isnumeric() == True:
            print("------------------------------------------------------")
            print(message)

            gotosleep.delay(int(message))

            self.send(text_data=json.dumps({
                'message': 'We are dealing with your request' …
Run Code Online (Sandbox Code Playgroud)

django celery django-celery django-channels

2
推荐指数
1
解决办法
1715
查看次数