标签: kombu

Celery 任务未发送给代理

当我尝试将任务发送给代理(RabbitMQ)时,它挂起。

# python shell
promise = foo.s(first_arg="2").apply_async()
# blocking indefinitely. I expected a promise object.
Run Code Online (Sandbox Code Playgroud)

如果我同步运行该任务,它将按预期工作。

# python shell
promise = foo.s(first_arg="2").apply()
>>> hello argument 2
Run Code Online (Sandbox Code Playgroud)

如果我.apply_async()用 ctrl+c 中断,我会得到一些线索的回溯:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/kombu/utils/functional.py", line 32, in __call__
    return self.__value__
AttributeError: 'ChannelPromise' object has no attribute '__value__'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/amqp/transport.py", line 173, in _connect
    host, port, family, socket.SOCK_STREAM, SOL_TCP)
  File "/usr/local/lib/python3.7/socket.py", line 752, …
Run Code Online (Sandbox Code Playgroud)

python rabbitmq celery kombu docker

4
推荐指数
1
解决办法
7044
查看次数

芹菜错误'没有这样的运输:amqp'

Celery工作正常,有一天,命令行工作人员未能启动以下跟踪:

Traceback (most recent call last):
  File "/home/buildslave/venv/bin/celery", line 9, in <module>
    load_entry_point('celery==3.0.7', 'console_scripts', 'celery')()
  File "/home/buildslave/venv/local/lib/python2.7/site-packages/celery/__main__.py", line 14, in main
    main()
  File "/home/buildslave/venv/local/lib/python2.7/site-packages/celery/bin/celery.py", line 942, in main
    cmd.execute_from_commandline(argv)
  File "/home/buildslave/venv/local/lib/python2.7/site-packages/celery/bin/celery.py", line 886, in execute_from_commandline
    super(CeleryCommand, self).execute_from_commandline(argv)))
  File "/home/buildslave/venv/local/lib/python2.7/site-packages/celery/bin/base.py", line 175, in execute_from_commandline
    return self.handle_argv(prog_name, argv[1:])
  File "/home/buildslave/venv/local/lib/python2.7/site-packages/celery/bin/celery.py", line 881, in handle_argv
    return self.execute(command, argv)
  File "/home/buildslave/venv/local/lib/python2.7/site-packages/celery/bin/celery.py", line 856, in execute
    return cls(app=self.app).run_from_argv(self.prog_name, argv)
  File "/home/buildslave/venv/local/lib/python2.7/site-packages/celery/bin/celery.py", line 142, in run_from_argv
    return self(*args, **options)
  File "/home/buildslave/venv/local/lib/python2.7/site-packages/celery/bin/celery.py", line 112, in __call__
    ret …
Run Code Online (Sandbox Code Playgroud)

python pip virtualenv celery kombu

3
推荐指数
1
解决办法
2641
查看次数

Kombu是否可以发布并订阅多个消费者

将Kombu与RabbitMQ结合使用以实现经典的发布/订阅设计模式。我创建了一个创建主题的生产者:

from kombu import Connection, Exchange, Queue

media_exchange = Exchange('media', 'topic', durable=False)
video_queue = Queue('video', exchange=media_exchange, routing_key='video')

with Connection('amqp://guest:guest@localhost//') as conn:
    producer = conn.Producer(serializer='json')
    producer.publish('Hello World!',
                      exchange=media_exchange, routing_key='video',
                      declare=[video_queue])
Run Code Online (Sandbox Code Playgroud)

然后,我创建了一个要从发布者那里消费的消费者:

from kombu import Connection, Exchange, Queue

media_exchange = Exchange('media', type='topic', durable=False)
video_queue = Queue('video', exchange=media_exchange, routing_key='video')

def process_media(body, message):
    print(body)
    #message.ack()

with Connection('amqp://guest:guest@localhost//') as conn:
    with conn.Consumer(video_queue, callbacks=[process_media]) as consumer:
        # Process messages and handle events on all channels
        while True:
            conn.drain_events()
Run Code Online (Sandbox Code Playgroud)

然后启动两个消费者,每个消费者在一个单独的终端中;两者都等待消息:

terminal 1: python consumer.py
terminal 2: python …
Run Code Online (Sandbox Code Playgroud)

amqp rabbitmq kombu

3
推荐指数
1
解决办法
1598
查看次数

无法为交换“reply.celery.pidbox”路由消息:表为空或键不再存在

我正在尝试生成一些背景,celery beat使用 进程docker-compose,但它们不再工作了。我的配置:

docker-compose-dev.yml

worker-periodic:
    image: dev3_web
    restart: always
    volumes:
      - ./services/web:/usr/src/app
      - ./services/web/celery_logs:/usr/src/app/celery_logs

    command: celery beat -A celery_worker.celery --schedule=/tmp/celerybeat-schedule --loglevel=DEBUG --pidfile=/tmp/celerybeat.pid
    environment:
      - CELERY_BROKER=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/0
      - FLASK_ENV=development
      - APP_SETTINGS=project.config.DevelopmentConfig
      - DATABASE_URL=postgres://postgres:postgres@web-db:5432/web_dev 
      - DATABASE_TEST_URL=postgres://postgres:postgres@web-db:5432/web_test
      - SECRET_KEY=my_precious  
    depends_on:
      - web
      - redis
      - web-db
    links:
      - redis:redis
      - web-db:web-db
Run Code Online (Sandbox Code Playgroud)

在我启动容器后,我$ docker ps得到(注意 worker-periodic_1 总是在几秒钟前启动):

697322a621d5        dev3_web               "celery worker -A ce…"   24 hours ago        Up 5 minutes                                           dev3_worker-analysis_1
d8e414aa4e5b        dev3_web               "celery worker -A ce…"   24 …
Run Code Online (Sandbox Code Playgroud)

redis flask kombu celerybeat docker-compose

3
推荐指数
1
解决办法
2987
查看次数

芹菜击败没有开始EOFError('输出输出')

一切都很好,直到:

celery beat v3.1.18 (Cipater) is starting.
__    -    ... __   -        _
Configuration ->
    . broker -> amqp://user:**@staging-api.user-app.com:5672//
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> celery.beat.PersistentScheduler
    . db -> /tmp/beat.db
    . logfile -> [stderr]@%INFO
    . maxinterval -> now (0s)
[2015-09-25 17:29:24,453: INFO/MainProcess] beat: Starting...
[2015-09-25 17:29:24,457: CRITICAL/MainProcess] beat raised exception <class 'EOFError'>: EOFError('Ran out of input',)
Traceback (most recent call last):
  File "/home/user/staging/venv/lib/python3.4/site-packages/kombu/utils/__init__.py", line 320, in __get__
    return obj.__dict__[self.__name__]
KeyError: 'scheduler'

During handling of the above exception, another exception …
Run Code Online (Sandbox Code Playgroud)

celery kombu celerybeat

2
推荐指数
1
解决办法
1557
查看次数

获取ImportError:无法导入名称OrderedDict

我收到错误

ImportError:无法导入名称OrderedDict

在我的ubuntu 14.04机器上本地安装服务器时。
我已经尝试过手动安装ordereddict,也尝试升级kombu,但它仍然给出错误。完整的追溯信息添加在下面-

rishav @ Swastik:〜/ open-event-orga-server $ sudo python create_db.py
追溯(最近一次调用):文件“ create_db.py”,第5行,
来自应用程序导入current_app
文件“ / home / rishav / open” 从celery导入-event-orga-server / app / init .py”,第23行,在
celery中导入Celery
文件“ /usr/local/lib/python2.7/dist-packages/celery/ init .py”,第133行,在
从celery导入五个#noqa文件“ /usr/local/lib/python2.7/dist-packages/celery/five.py”,第153行,
从kombu.utils.compat导入import OrderedDict#noqa
ImportError:无法导入名称OrderedDict

python-2.7 kombu

2
推荐指数
2
解决办法
6533
查看次数

Python Kombu - 阻止

我正在使用kombu通过生产者/消费者模型管理RabbitMQ.我启动了我的生产者,它在队列中放置了100个作业(我只有一个队列和一个交换).我想同时启动多个消费者,让每个消费者一次处理一个工作.不幸的是,消费者互相阻挠(即,当一个消费者从队列中抓取一份工作时,其他消费者只是闲置着).如果我杀死了工作消费者,那么其他一个消费者就会开始工作并开始工作.有没有办法让所有消费者同时运行,每个消费者从队列中处理不同的工作?我的消费者代码如下:

def start_consumer(self, incoming_exchange_name):
    if self.rabbitmq_connection.connected:
        callbacks=[]
        queues=[]

        callbacks.append(self._callback)
        queues.append(self.incoming_queue)

        print 'opening a new *incoming* rabbitmq connection to the %s exchange for the %s queue' % (self.incoming_exchange.name, self.incoming_queue.name)
        self.incoming_exchange(settings.rabbitmq_connection).declare()
        self.incoming_queue(settings.rabbitmq_connection).declare()

        with settings.rabbitmq_connection.Consumer(queues=queues, callbacks=callbacks) as consumer:
            while True:
                try:
                    self.rabbitmq_connection.drain_events()
                except Exception as e:
                    print 'Error -> %s' % e.message 
Run Code Online (Sandbox Code Playgroud)

python multithreading rabbitmq kombu pika

0
推荐指数
1
解决办法
2156
查看次数