标签: celery

celery无法使用redis

第一次尝试启动 Celery 但出现如下错误,我已经安装了 redis 并且它的启动正常,但仍然不知何故 django 似乎有问题,

File "<frozen importlib._bootstrap_external>", line 848, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/home/atif/Documents/celery_test/celery-env/lib/python3.8/site-packages/kombu/transport/redis.py", line 263, in <module>
    class PrefixedStrictRedis(GlobalKeyPrefixMixin, redis.Redis):
AttributeError: 'NoneType' object has no attribute 'Redis'
Run Code Online (Sandbox Code Playgroud)

芹菜.py

from django.conf import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celery_test.settings')



    app = Celery('celery_test',)
    
    app.config_from_object('django.conf:settings')
    
    # Load task modules from all registered Django apps.
    app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
    
    
    @app.task(bind=True)
    def debug_task(self):
        print(f'Request: {self.request!r}')
Run Code Online (Sandbox Code Playgroud)

设置

#celery stuff ---------------
BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json' …
Run Code Online (Sandbox Code Playgroud)

django message-queue redis celery django-celery

59
推荐指数
2
解决办法
3万
查看次数

Celery AttributeError:异步错误

我在我的Mac(OS/X 10.13.4)上本地运行RabbitMQ和Celery,当我运行add.delay(x,y)时,以下代码在本地运行:

#!/usr/bin/env python
from celery import Celery
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

app = Celery('tasks', \
        broker='pyamqp://appuser:xx@c2/appvhost', \
        backend='db+mysql://appuser:xx@c2/pigpen')

@app.task(bind=True)
def dump_context(self, x, y):
    print('Executing task id {0.id}, args: {0.args!r} kwargs {0.kwargs!r}'.format(self.request))

@app.task
def add(x, y):
    logger.info('Adding {0} + {1}'.format(x, y))
    return x + y
Run Code Online (Sandbox Code Playgroud)

但是,当我尝试在运行Kali 2018.2的ODROID-C2上运行Celery worker时(w.当前更新,运行时出现以下错误celery -A tasks worker --loglevel=info:

Traceback (most recent call last):
  File "/usr/local/bin/celery", line 11, in <module>
    sys.exit(main())
  File "/usr/local/lib/python2.7/dist-packages/celery/__main__.py", line 14, in main
    _main()
  File "/usr/local/lib/python2.7/dist-packages/celery/bin/celery.py", line 326, …
Run Code Online (Sandbox Code Playgroud)

python celery celery-task

58
推荐指数
5
解决办法
1万
查看次数

无法在heroku django中导入名称_uuid_generate_random

我正在开发一个扫描用户gmail收件箱并提供报告的项目.我已经在heroku中部署了以下规范:

语言:Python 2.7

框架:Django 1.8

任务调度程序:Celery(Rabbitmq-bigwig for broker url)

现在当heroku执行它时,芹菜没有给我输出.在Heroku推动其显示Collectstatic配置错误.我尝试过使用whitenoise包

还尝试执行:heroku运行python manage.py collectstatic --dry-run --noinput 仍然得到相同的错误.

$ heroku运行python manage.py collectstatic --noinput给出了错误的以下细节.

File "manage.py", line 10, in <module>
execute_from_command_line(sys.argv)
File "/app/.heroku/python/lib/python2.7/site-packages/django/core/management/__init__.py", line 338, in execute_from_command_line
utility.execute()
File "/app/.heroku/python/lib/python2.7/site-packages/django/core/management/__init__.py", line 303, in execute
settings.INSTALLED_APPS
File "/app/.heroku/python/lib/python2.7/site-packages/django/conf/__init__.py", line 48, in __getattr__
self._setup(name)
File "/app/.heroku/python/lib/python2.7/site-packages/django/conf/__init__.py", line 44, in _setup
self._wrapped = Settings(settings_module)
File "/app/.heroku/python/lib/python2.7/site-packages/django/conf/__init__.py", line 92, in __init__
mod = importlib.import_module(self.SETTINGS_MODULE)
File …
Run Code Online (Sandbox Code Playgroud)

python django heroku celery

57
推荐指数
2
解决办法
3万
查看次数

芹菜与亚马逊SQS

我想使用Amazon SQS作为Celery支持的经纪人.有关于Kombu的SQS传输实现,Celery依赖它.但是没有足够的文档来使用它,所以我找不到如何在Celery上配置SQS.是否有人成功在Celery上配置SQS?

amazon-sqs amazon-web-services celery kombu

51
推荐指数
2
解决办法
2万
查看次数

用芹菜运行"独特"的任务

我使用芹菜来更新我的新闻聚合网站中的RSS提要.我为每个feed使用一个@task,事情看起来效果很好.

有一个细节,我不能确定处理得好:所有的feed都是每分钟用@periodic_task更新一次,但是如果一个feed仍在从最后一个周期性任务中更新时新的一个怎么办?(例如,如果Feed非常慢,或者离线且任务保持在重试循环中)

目前我存储任务结果并检查其状态如下:

import socket
from datetime import timedelta
from celery.decorators import task, periodic_task
from aggregator.models import Feed


_results = {}


@periodic_task(run_every=timedelta(minutes=1))
def fetch_articles():
    for feed in Feed.objects.all():
        if feed.pk in _results:
            if not _results[feed.pk].ready():
                # The task is not finished yet
                continue
        _results[feed.pk] = update_feed.delay(feed)


@task()
def update_feed(feed):
    try:
        feed.fetch_articles()
    except socket.error, exc:
        update_feed.retry(args=[feed], exc=exc)
Run Code Online (Sandbox Code Playgroud)

也许有一种更复杂/更健壮的方法可以使用我错过的一些芹菜机制来实现相同的结果?

python django celery

48
推荐指数
5
解决办法
2万
查看次数

检测Celery是否可用/正在运行

我正在使用Celery来管理异步任务.然而,偶尔芹菜过程会失效,导致任何任务都无法执行.我希望能够检查芹菜的状态并确保一切正常,如果我发现任何问题,则向用户显示错误消息.从Celery Worker文档看起来我可能能够使用ping或者检查这个,但ping感觉很乱,并且不清楚究竟如何使用inspect(如果inspect().registered()是空的?).

任何有关这方面的指导将不胜感激.基本上我正在寻找的方法是这样的:

def celery_is_alive():
    from celery.task.control import inspect
    return bool(inspect().registered()) # is this right??
Run Code Online (Sandbox Code Playgroud)

编辑:它看起来甚至不像celery 2.3.3上的registered()(即使2.1文档列出它).也许ping是正确的答案.

编辑:Ping似乎也没有做我认为会做的事情,所以仍然不确定这里的答案.

python django celery django-celery

48
推荐指数
5
解决办法
3万
查看次数

芹菜在并发,工作和自动缩放之间的区别

在我的/etc/defaults/celeryd配置文件中,我设置了:

CELERYD_NODES="agent1 agent2 agent3 agent4 agent5 agent6 agent7 agent8"
CELERYD_OPTS="--autoscale=10,3 --concurrency=5"
Run Code Online (Sandbox Code Playgroud)

据我所知,该守护进程会产生8名芹菜工人,但我完全不知道该做什么autoscale,concurrency一起做什么.我认为并发是指定工作者可以使用的最大线程数的一种方式,并且如果需要,自动缩放是工作者扩展和缩小子工作者的一种方式.

这些任务有效载荷很大(大约20-50kB),有2-3万个这样的任务,但每个任务都在不到一秒的时间内运行.我看到内存使用率飙升,因为代理将任务分配给每个工作人员,因此多次复制有效负载.

我认为问题在于配置,并且工作者+并发+自动缩放的组合过度,我希望更好地理解这三个选项的作用.

python concurrency celery

48
推荐指数
2
解决办法
3万
查看次数

Celery任务运行更多任务

我正在使用celerybeat开始执行许多次要任务的主要任务.我已经写了两个任务.

有没有办法轻松做到这一点?Celery是否允许从任务中运行任务?

我的例子:

@task
def compute(users=None):
    if users is None:
        users = User.objects.all()

    tasks = []
    for user in users:
        tasks.append(compute_for_user.subtask((user.id,)))

    job = TaskSet(tasks)
    job.apply_async() # raises a IOError: Socket closed

@task
def compute_for_user(user_id):
    #do some stuff
Run Code Online (Sandbox Code Playgroud)

compute从celerybeat调用,但在尝试运行时会导致IOError apply_async.有任何想法吗?

python django task celery

47
推荐指数
3
解决办法
3万
查看次数

RabbitMQ(beam.smp)和高CPU /内存负载问题

我有一个debian盒用芹菜和rabbitmq运行任务大约一年.最近我注意到任务没有被处理,所以我登录系统,发现芹菜无法连接到rabbitmq.我重新启动了rabbitmq-server,尽管芹菜不再抱怨它现在没有执行新的任务.奇怪的是,rabbitmq正在疯狂地吞噬cpu和内存资源.重新启动服务器无法解决问题.花了几个小时在网上寻找解决方案后无济于事我决定重建服务器.

我用Debian 7.5,rabbitmq 2.8.4,芹菜3.1.13(Cipater)重建了新的服务器.大约一个小时左右,一切都工作得很好,直到芹菜开始再次抱怨它无法连接到rabbitmq!

[2014-08-06 05:17:21,036: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: [Errno 111] Connection refused.
Trying again in 6.00 seconds...
Run Code Online (Sandbox Code Playgroud)

我重新启动了rabbitmq service rabbitmq-server start和同样的问题:

rabbitmq开始再次膨胀,不断冲击cpu并慢慢接管所有ram并交换:

PID USER      PR  NI  VIRT  RES  SHR S  %CPU %MEM    TIME+  COMMAND
21823 rabbitmq  20   0  908m 488m 3900 S 731.2 49.4   9:44.74 beam.smp
Run Code Online (Sandbox Code Playgroud)

结果rabbitmqctl status如下:

Status of node 'rabbit@li370-61' ...
[{pid,21823},
 {running_applications,[{rabbit,"RabbitMQ","2.8.4"},
                        {os_mon,"CPO  CXC 138 46","2.2.9"},
                        {sasl,"SASL  CXC 138 11","2.2.1"},
                        {mnesia,"MNESIA  CXC 138 12","4.7"},
                        {stdlib,"ERTS  CXC 138 10","1.18.1"},
                        {kernel,"ERTS  CXC …
Run Code Online (Sandbox Code Playgroud)

erlang debian mnesia rabbitmq celery

47
推荐指数
2
解决办法
7万
查看次数

如何从其他渠道恢复未确认的AMQP消息,而不是我自己的连接?

似乎我让我的Rabbitmq服务器运行的时间越长,我对未确认消息的麻烦就越多.我很乐意将它们重新排列.实际上似乎有一个amqp命令来执行此操作,但它仅适用于您的连接使用的通道.我制作了一个小的鼠兔脚本,至少尝试一下,但是我要么缺少一些东西,要么就是这样做了(用rabbitmqctl怎么样?)

import pika

credentials = pika.PlainCredentials('***', '***')
parameters = pika.ConnectionParameters(host='localhost',port=5672,\
    credentials=credentials, virtual_host='***')

def handle_delivery(body):
    """Called when we receive a message from RabbitMQ"""
    print body

def on_connected(connection):
    """Called when we are fully connected to RabbitMQ"""
    connection.channel(on_channel_open)    

def on_channel_open(new_channel):
    """Called when our channel has opened"""
    global channel
    channel = new_channel
    channel.basic_recover(callback=handle_delivery,requeue=True)    

try:
    connection = pika.SelectConnection(parameters=parameters,\
        on_open_callback=on_connected)    

    # Loop so we can communicate with RabbitMQ
    connection.ioloop.start()
except KeyboardInterrupt:
    # Gracefully close the connection
    connection.close()
    # Loop until we're fully closed, will stop on …
Run Code Online (Sandbox Code Playgroud)

amqp rabbitmq celery celeryd pika

46
推荐指数
3
解决办法
5万
查看次数