目前我在Django上有一个网站.但我打算运行更多的Django网站.
所以我想知道我是否需要celeryd为每个新网站运行或者一个就足够了.
我通过主管运行`celeryd守护进程
我知道这违背了使用 Celery 的全部目的,但是是否有一个函数会阻塞直到结果返回?
因此MyTask.delay(some_arg="foo"),我可以调用actual_result = MyTask.dont_delay(some_arg="foo")它将阻塞并返回结果,而不是我必须四处走动并获取结果?
我正在使用芹菜殴打来安排一些任务.我可以使用CELERY_TIMEZONE设置来使用crontab计划安排任务,并在上述时区的预定时间运行.
但我希望能够在同一个应用程序中为不同的时区设置多个此类任务(单个django settings.py).我知道在安排任务时需要在哪个时区运行哪个任务.
是否可以为每个任务指定不同的时区?
我正在使用django(1.4)和芹菜(3.0.11)和django芹菜(3.0.11).
我看过这个djcelery.schedulers.DatabaseScheduler类和它的基类,但我无法弄清楚时区的使用方式和位置.我可以编写一个自定义调度程序,可以使每个作业在不同的时区运行吗?
谢谢,
从另一个stackoverflow回答,我试图限制芹菜的工人数量
在我终止所有芹菜工人后,我用新的配置重新启动了芹菜.
CELERYD_CONCURRENCY = 1(在Django的settings.py中)
然后我输入以下命令来检查有多少芹菜工人正在工作.
ps auxww | grep 'celery worker' | grep -v grep | awk '{print $2}'
它返回两个PID,如24803,24817.
然后我改变配置CELERYD_CONCURRENCY = 2并重新启动芹菜.
相同的命令返回三个PID,如24944,24958,24959.(如您所见,最后两个PID是顺序的)
这意味着按照我的预期增加了工人数量.
但是,我不知道为什么它会返回两个PID,即使只有一个芹菜工人正在工作?
是否有一些辅助过程来帮助工人?
我已经阅读了很多与此类似的帖子,但对我来说似乎没有任何意义。
我正在尝试将 Celery PeriodicTask 配置为每 5 秒触发一次,但我被 Celery 配置问题所困扰(我认为)
通讯/任务.py
import datetime
from celery.decorators import periodic_task
@periodic_task
def send_queued_messages():
# do something...
Run Code Online (Sandbox Code Playgroud)
我的应用程序/settings.py
...
from comm.tasks import send_queued_messages
from datetime import timedelta
CELERYBEAT_SCHEDULE = {
'send_queued_messages_every_5_seconds': {
'task': 'comm.tasks.send_queued_messages', # Is the issue here? I've tried a dozen variations!!
'schedule': timedelta(seconds=5),
},
}
Run Code Online (Sandbox Code Playgroud)
我的错误日志的相关输出:
23:41:00 worker.1 | [2015-06-10 03:41:00,657: ERROR/MainProcess] Received unregistered task of type 'send_queued_messages'.
23:41:00 worker.1 | The message has been ignored and discarded.
23:41:00 worker.1 |
23:41:00 …Run Code Online (Sandbox Code Playgroud) 我刚刚使用 RabbitMQ 在我的本地机器 (win10) 上安装了 celery 并遵循了他们的初学者教程。但我收到错误消息TypeError: 'tuple' object is not callable
src>workon clicknstrip
(clicknstrip) src>python manage.py celery beat
celery beat v3.1.18 (Cipater) is starting.
__ - ... __ - _
Configuration ->
. broker -> amqp://guest:**@localhost:5672//
. loader -> celery.loaders.app.AppLoader
. scheduler -> celery.beat.PersistentScheduler
. db -> celerybeat-schedule
. logfile -> [stderr]@%INFO
. maxinterval -> now (0s)
[2015-08-13 10:01:13,441: INFO/MainProcess] beat: Starting...
[2015-08-13 10:01:13,466: WARNING/MainProcess] DB Reset: Account for new __version__ field
[2015-08-13 10:01:13,470: CRITICAL/MainProcess] beat …Run Code Online (Sandbox Code Playgroud) 我正在使用 Celery 和 Redis。我的 tasks.py 文件中有以下代码:
from celery import Celery
from faker import Factory
fake = Factory.create()
app = Celery("tasks")
app.conf.broker_url = 'redis://localhost:6379/0'
app.conf.result_backend = 'redis://localhost:6379/0'
@app.task
def twitterDP(hashtag):
if hashtag:
return ["From Twitter " + fake.text(20) + " hashtag # " + hashtag for x in range(5)]
return []
Run Code Online (Sandbox Code Playgroud)
为了运行任务,我还有另一个包含以下代码的脚本:
import zmq
from tasks import twitterDP
from celery.result import AsyncResult
import time
class WorkFlow(object):
def __init__(self):
self.ctx = zmq.Context()
self.socket_pull = self.ctx.socket(zmq.PULL)
self.socket_pull.bind("tcp://127.0.0.1:5860")
def do_work(self):
while True:
recv_msg = …Run Code Online (Sandbox Code Playgroud) 这是否可以将 django-celery 守护进程作为 virtualenv 中项目的 systemd 服务?
这是我的配置:
/etc/systemd/system/celery.service
[Unit]
Description=Celery Service
After=network.target
[Service]
Type=forking
User=vagrant
Group=vagrant
Restart=no
WorkingDirectory=/vagrant/myproj/
ExecStart=/bin/sh -c '/var/www/vhost/myproj_env/bin/python \
/vagrant/myproj/manage.py celery worker \
--loglevel=DEBUG \
--logfile=/var/log/celery/worker.log \
--pidfile=/var/run/celery/worker.pid \
-Q availability,celery --time-limit=300'
ExecStop=/bin/sh -c '/var/www/vhost/myproj_env/bin/python \
/vagrant/myproj/manage.py celery stop \
--pidfile=/var/run/celery/worker.pid'
[Install]
WantedBy=multi-user.target
Run Code Online (Sandbox Code Playgroud)
这里提到的所有目录确实存在,并且权限设置正确
settings.py 中的 django-celery:
INSTALLED_APPS = (
...
'djcelery',
'celery_haystack',
...
)
import djcelery
djcelery.setup_loader()
CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler"
BROKER_URL = "redis://localhost:6379/0"
CELERY_RESULT_BACKEND = BROKER_URL
CELERY_REDIS_MAX_CONNECTIONS = 30
Run Code Online (Sandbox Code Playgroud)
以下命令正常启动celery,可以看到正在执行的任务:
python manage.py celery worker --loglevel=INFO …Run Code Online (Sandbox Code Playgroud) 文件结构
proj/proj/
celery.py
(and other files)
/sitesettings/
tasks.py
(and other files)
Run Code Online (Sandbox Code Playgroud)
芹菜.py
app = Celery('mooncake',broker_url = 'amqp://')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
Run Code Online (Sandbox Code Playgroud)
站点设置/任务.py
from __future__ import absolute_import, unicode_literals
from comma.models import Post
from mooncake.celery import app
app.conf.beat_schedule = {
'every-5-seconds': {
'task': 'sitesettings.tasks.statisticsTag',
'schedule': 5.0,
'args': ()
},
}
@app.task
def statisticsTag():
print(Post.objects.all()[0])
Run Code Online (Sandbox Code Playgroud)
并运行它
celery -A proj beat -l info
Run Code Online (Sandbox Code Playgroud)
把它拿出来
[2019-02-22 18:21:08,346: INFO/MainProcess] Scheduler: Sending due task every-5-seconds (sitesettings.tasks.statisticsTag)
Run Code Online (Sandbox Code Playgroud)
但没有进一步的输出。我曾经尝试在 proj/celery.py 中编写它,但它无法运行,因为我必须从另一个应用程序导入,它以“应用程序未加载”错误退出。所以我该怎么做?
我有一个 Django 应用程序,它使用通道来监视 WebSocket 以启动 Celery 中的后端任务。它当前睡眠给定的数量,然后返回 true。
问题是我不知道如何从 celery 任务中访问 WebSocket,所以我可以在完成后通知 UI。
celery==4.3.0
channels==2.2.0
Django==2.2.4
django-celery-results==1.1.2
djangorestframework==3.10.2
Run Code Online (Sandbox Code Playgroud)
我的任务.py
from __future__ import absolute_import, unicode_literals
from celery import shared_task
import time
@shared_task
def gotosleep(timeInSecs):
time.sleep(timeInSecs)
return True
Run Code Online (Sandbox Code Playgroud)
我的消费者.py
from channels.generic.websocket import WebsocketConsumer
import json
from access.tasks import gotosleep
class AccessConsumer(WebsocketConsumer):
def connect(self):
self.accept()
def disconnect(self, close_code):
pass
def receive(self, text_data):
text_data_json = json.loads(text_data)
message = text_data_json['message']
if message.isnumeric() == True:
print("------------------------------------------------------")
print(message)
gotosleep.delay(int(message))
self.send(text_data=json.dumps({
'message': 'We are dealing with your request' …Run Code Online (Sandbox Code Playgroud)