第一次尝试启动 Celery 但出现如下错误,我已经安装了 redis 并且它的启动正常,但仍然不知何故 django 似乎有问题,
File "<frozen importlib._bootstrap_external>", line 848, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/home/atif/Documents/celery_test/celery-env/lib/python3.8/site-packages/kombu/transport/redis.py", line 263, in <module>
class PrefixedStrictRedis(GlobalKeyPrefixMixin, redis.Redis):
AttributeError: 'NoneType' object has no attribute 'Redis'
Run Code Online (Sandbox Code Playgroud)
芹菜.py
from django.conf import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celery_test.settings')
app = Celery('celery_test',)
app.config_from_object('django.conf:settings')
# Load task modules from all registered Django apps.
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
Run Code Online (Sandbox Code Playgroud)
设置
#celery stuff ---------------
BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json' …Run Code Online (Sandbox Code Playgroud) 我在我的Mac(OS/X 10.13.4)上本地运行RabbitMQ和Celery,当我运行add.delay(x,y)时,以下代码在本地运行:
#!/usr/bin/env python
from celery import Celery
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
app = Celery('tasks', \
broker='pyamqp://appuser:xx@c2/appvhost', \
backend='db+mysql://appuser:xx@c2/pigpen')
@app.task(bind=True)
def dump_context(self, x, y):
print('Executing task id {0.id}, args: {0.args!r} kwargs {0.kwargs!r}'.format(self.request))
@app.task
def add(x, y):
logger.info('Adding {0} + {1}'.format(x, y))
return x + y
Run Code Online (Sandbox Code Playgroud)
但是,当我尝试在运行Kali 2018.2的ODROID-C2上运行Celery worker时(w.当前更新,运行时出现以下错误celery -A tasks worker --loglevel=info:
Traceback (most recent call last):
File "/usr/local/bin/celery", line 11, in <module>
sys.exit(main())
File "/usr/local/lib/python2.7/dist-packages/celery/__main__.py", line 14, in main
_main()
File "/usr/local/lib/python2.7/dist-packages/celery/bin/celery.py", line 326, …Run Code Online (Sandbox Code Playgroud) 我正在开发一个扫描用户gmail收件箱并提供报告的项目.我已经在heroku中部署了以下规范:
语言:Python 2.7
框架:Django 1.8
任务调度程序:Celery(Rabbitmq-bigwig for broker url)
现在当heroku执行它时,芹菜没有给我输出.在Heroku推动其显示Collectstatic配置错误.我尝试过使用whitenoise包
还尝试执行:heroku运行python manage.py collectstatic --dry-run --noinput 仍然得到相同的错误.
$ heroku运行python manage.py collectstatic --noinput给出了错误的以下细节.
File "manage.py", line 10, in <module>
execute_from_command_line(sys.argv)
File "/app/.heroku/python/lib/python2.7/site-packages/django/core/management/__init__.py", line 338, in execute_from_command_line
utility.execute()
File "/app/.heroku/python/lib/python2.7/site-packages/django/core/management/__init__.py", line 303, in execute
settings.INSTALLED_APPS
File "/app/.heroku/python/lib/python2.7/site-packages/django/conf/__init__.py", line 48, in __getattr__
self._setup(name)
File "/app/.heroku/python/lib/python2.7/site-packages/django/conf/__init__.py", line 44, in _setup
self._wrapped = Settings(settings_module)
File "/app/.heroku/python/lib/python2.7/site-packages/django/conf/__init__.py", line 92, in __init__
mod = importlib.import_module(self.SETTINGS_MODULE)
File …Run Code Online (Sandbox Code Playgroud) 我想使用Amazon SQS作为Celery支持的经纪人.有关于Kombu的SQS传输实现,Celery依赖它.但是没有足够的文档来使用它,所以我找不到如何在Celery上配置SQS.是否有人成功在Celery上配置SQS?
我使用芹菜来更新我的新闻聚合网站中的RSS提要.我为每个feed使用一个@task,事情看起来效果很好.
有一个细节,我不能确定处理得好:所有的feed都是每分钟用@periodic_task更新一次,但是如果一个feed仍在从最后一个周期性任务中更新时新的一个怎么办?(例如,如果Feed非常慢,或者离线且任务保持在重试循环中)
目前我存储任务结果并检查其状态如下:
import socket
from datetime import timedelta
from celery.decorators import task, periodic_task
from aggregator.models import Feed
_results = {}
@periodic_task(run_every=timedelta(minutes=1))
def fetch_articles():
for feed in Feed.objects.all():
if feed.pk in _results:
if not _results[feed.pk].ready():
# The task is not finished yet
continue
_results[feed.pk] = update_feed.delay(feed)
@task()
def update_feed(feed):
try:
feed.fetch_articles()
except socket.error, exc:
update_feed.retry(args=[feed], exc=exc)
Run Code Online (Sandbox Code Playgroud)
也许有一种更复杂/更健壮的方法可以使用我错过的一些芹菜机制来实现相同的结果?
我正在使用Celery来管理异步任务.然而,偶尔芹菜过程会失效,导致任何任务都无法执行.我希望能够检查芹菜的状态并确保一切正常,如果我发现任何问题,则向用户显示错误消息.从Celery Worker文档看起来我可能能够使用ping或者检查这个,但ping感觉很乱,并且不清楚究竟如何使用inspect(如果inspect().registered()是空的?).
任何有关这方面的指导将不胜感激.基本上我正在寻找的方法是这样的:
def celery_is_alive():
from celery.task.control import inspect
return bool(inspect().registered()) # is this right??
Run Code Online (Sandbox Code Playgroud)
编辑:它看起来甚至不像celery 2.3.3上的registered()(即使2.1文档列出它).也许ping是正确的答案.
编辑:Ping似乎也没有做我认为会做的事情,所以仍然不确定这里的答案.
在我的/etc/defaults/celeryd配置文件中,我设置了:
CELERYD_NODES="agent1 agent2 agent3 agent4 agent5 agent6 agent7 agent8"
CELERYD_OPTS="--autoscale=10,3 --concurrency=5"
Run Code Online (Sandbox Code Playgroud)
据我所知,该守护进程会产生8名芹菜工人,但我完全不知道该做什么autoscale,concurrency一起做什么.我认为并发是指定工作者可以使用的最大线程数的一种方式,并且如果需要,自动缩放是工作者扩展和缩小子工作者的一种方式.
这些任务有效载荷很大(大约20-50kB),有2-3万个这样的任务,但每个任务都在不到一秒的时间内运行.我看到内存使用率飙升,因为代理将任务分配给每个工作人员,因此多次复制有效负载.
我认为问题在于配置,并且工作者+并发+自动缩放的组合过度,我希望更好地理解这三个选项的作用.
我正在使用celerybeat开始执行许多次要任务的主要任务.我已经写了两个任务.
有没有办法轻松做到这一点?Celery是否允许从任务中运行任务?
我的例子:
@task
def compute(users=None):
if users is None:
users = User.objects.all()
tasks = []
for user in users:
tasks.append(compute_for_user.subtask((user.id,)))
job = TaskSet(tasks)
job.apply_async() # raises a IOError: Socket closed
@task
def compute_for_user(user_id):
#do some stuff
Run Code Online (Sandbox Code Playgroud)
compute从celerybeat调用,但在尝试运行时会导致IOError apply_async.有任何想法吗?
我有一个debian盒用芹菜和rabbitmq运行任务大约一年.最近我注意到任务没有被处理,所以我登录系统,发现芹菜无法连接到rabbitmq.我重新启动了rabbitmq-server,尽管芹菜不再抱怨它现在没有执行新的任务.奇怪的是,rabbitmq正在疯狂地吞噬cpu和内存资源.重新启动服务器无法解决问题.花了几个小时在网上寻找解决方案后无济于事我决定重建服务器.
我用Debian 7.5,rabbitmq 2.8.4,芹菜3.1.13(Cipater)重建了新的服务器.大约一个小时左右,一切都工作得很好,直到芹菜开始再次抱怨它无法连接到rabbitmq!
[2014-08-06 05:17:21,036: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: [Errno 111] Connection refused.
Trying again in 6.00 seconds...
Run Code Online (Sandbox Code Playgroud)
我重新启动了rabbitmq service rabbitmq-server start和同样的问题:
rabbitmq开始再次膨胀,不断冲击cpu并慢慢接管所有ram并交换:
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
21823 rabbitmq 20 0 908m 488m 3900 S 731.2 49.4 9:44.74 beam.smp
Run Code Online (Sandbox Code Playgroud)
结果rabbitmqctl status如下:
Status of node 'rabbit@li370-61' ...
[{pid,21823},
{running_applications,[{rabbit,"RabbitMQ","2.8.4"},
{os_mon,"CPO CXC 138 46","2.2.9"},
{sasl,"SASL CXC 138 11","2.2.1"},
{mnesia,"MNESIA CXC 138 12","4.7"},
{stdlib,"ERTS CXC 138 10","1.18.1"},
{kernel,"ERTS CXC …Run Code Online (Sandbox Code Playgroud) 似乎我让我的Rabbitmq服务器运行的时间越长,我对未确认消息的麻烦就越多.我很乐意将它们重新排列.实际上似乎有一个amqp命令来执行此操作,但它仅适用于您的连接使用的通道.我制作了一个小的鼠兔脚本,至少尝试一下,但是我要么缺少一些东西,要么就是这样做了(用rabbitmqctl怎么样?)
import pika
credentials = pika.PlainCredentials('***', '***')
parameters = pika.ConnectionParameters(host='localhost',port=5672,\
credentials=credentials, virtual_host='***')
def handle_delivery(body):
"""Called when we receive a message from RabbitMQ"""
print body
def on_connected(connection):
"""Called when we are fully connected to RabbitMQ"""
connection.channel(on_channel_open)
def on_channel_open(new_channel):
"""Called when our channel has opened"""
global channel
channel = new_channel
channel.basic_recover(callback=handle_delivery,requeue=True)
try:
connection = pika.SelectConnection(parameters=parameters,\
on_open_callback=on_connected)
# Loop so we can communicate with RabbitMQ
connection.ioloop.start()
except KeyboardInterrupt:
# Gracefully close the connection
connection.close()
# Loop until we're fully closed, will stop on …Run Code Online (Sandbox Code Playgroud)