当我尝试将任务发送给代理(RabbitMQ)时,它挂起。
# python shell
promise = foo.s(first_arg="2").apply_async()
# blocking indefinitely. I expected a promise object.
Run Code Online (Sandbox Code Playgroud)
如果我同步运行该任务,它将按预期工作。
# python shell
promise = foo.s(first_arg="2").apply()
>>> hello argument 2
Run Code Online (Sandbox Code Playgroud)
如果我.apply_async()
用 ctrl+c 中断,我会得到一些线索的回溯:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/kombu/utils/functional.py", line 32, in __call__
return self.__value__
AttributeError: 'ChannelPromise' object has no attribute '__value__'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/amqp/transport.py", line 173, in _connect
host, port, family, socket.SOCK_STREAM, SOL_TCP)
File "/usr/local/lib/python3.7/socket.py", line 752, …
Run Code Online (Sandbox Code Playgroud) Celery工作正常,有一天,命令行工作人员未能启动以下跟踪:
Traceback (most recent call last):
File "/home/buildslave/venv/bin/celery", line 9, in <module>
load_entry_point('celery==3.0.7', 'console_scripts', 'celery')()
File "/home/buildslave/venv/local/lib/python2.7/site-packages/celery/__main__.py", line 14, in main
main()
File "/home/buildslave/venv/local/lib/python2.7/site-packages/celery/bin/celery.py", line 942, in main
cmd.execute_from_commandline(argv)
File "/home/buildslave/venv/local/lib/python2.7/site-packages/celery/bin/celery.py", line 886, in execute_from_commandline
super(CeleryCommand, self).execute_from_commandline(argv)))
File "/home/buildslave/venv/local/lib/python2.7/site-packages/celery/bin/base.py", line 175, in execute_from_commandline
return self.handle_argv(prog_name, argv[1:])
File "/home/buildslave/venv/local/lib/python2.7/site-packages/celery/bin/celery.py", line 881, in handle_argv
return self.execute(command, argv)
File "/home/buildslave/venv/local/lib/python2.7/site-packages/celery/bin/celery.py", line 856, in execute
return cls(app=self.app).run_from_argv(self.prog_name, argv)
File "/home/buildslave/venv/local/lib/python2.7/site-packages/celery/bin/celery.py", line 142, in run_from_argv
return self(*args, **options)
File "/home/buildslave/venv/local/lib/python2.7/site-packages/celery/bin/celery.py", line 112, in __call__
ret …
Run Code Online (Sandbox Code Playgroud) 将Kombu与RabbitMQ结合使用以实现经典的发布/订阅设计模式。我创建了一个创建主题的生产者:
from kombu import Connection, Exchange, Queue
media_exchange = Exchange('media', 'topic', durable=False)
video_queue = Queue('video', exchange=media_exchange, routing_key='video')
with Connection('amqp://guest:guest@localhost//') as conn:
producer = conn.Producer(serializer='json')
producer.publish('Hello World!',
exchange=media_exchange, routing_key='video',
declare=[video_queue])
Run Code Online (Sandbox Code Playgroud)
然后,我创建了一个要从发布者那里消费的消费者:
from kombu import Connection, Exchange, Queue
media_exchange = Exchange('media', type='topic', durable=False)
video_queue = Queue('video', exchange=media_exchange, routing_key='video')
def process_media(body, message):
print(body)
#message.ack()
with Connection('amqp://guest:guest@localhost//') as conn:
with conn.Consumer(video_queue, callbacks=[process_media]) as consumer:
# Process messages and handle events on all channels
while True:
conn.drain_events()
Run Code Online (Sandbox Code Playgroud)
然后启动两个消费者,每个消费者在一个单独的终端中;两者都等待消息:
terminal 1: python consumer.py
terminal 2: python …
Run Code Online (Sandbox Code Playgroud) 我正在尝试生成一些背景,celery beat
使用 进程docker-compose
,但它们不再工作了。我的配置:
docker-compose-dev.yml
worker-periodic:
image: dev3_web
restart: always
volumes:
- ./services/web:/usr/src/app
- ./services/web/celery_logs:/usr/src/app/celery_logs
command: celery beat -A celery_worker.celery --schedule=/tmp/celerybeat-schedule --loglevel=DEBUG --pidfile=/tmp/celerybeat.pid
environment:
- CELERY_BROKER=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/0
- FLASK_ENV=development
- APP_SETTINGS=project.config.DevelopmentConfig
- DATABASE_URL=postgres://postgres:postgres@web-db:5432/web_dev
- DATABASE_TEST_URL=postgres://postgres:postgres@web-db:5432/web_test
- SECRET_KEY=my_precious
depends_on:
- web
- redis
- web-db
links:
- redis:redis
- web-db:web-db
Run Code Online (Sandbox Code Playgroud)
在我启动容器后,我$ docker ps
得到(注意 worker-periodic_1 总是在几秒钟前启动):
697322a621d5 dev3_web "celery worker -A ce…" 24 hours ago Up 5 minutes dev3_worker-analysis_1
d8e414aa4e5b dev3_web "celery worker -A ce…" 24 …
Run Code Online (Sandbox Code Playgroud) 一切都很好,直到:
celery beat v3.1.18 (Cipater) is starting.
__ - ... __ - _
Configuration ->
. broker -> amqp://user:**@staging-api.user-app.com:5672//
. loader -> celery.loaders.app.AppLoader
. scheduler -> celery.beat.PersistentScheduler
. db -> /tmp/beat.db
. logfile -> [stderr]@%INFO
. maxinterval -> now (0s)
[2015-09-25 17:29:24,453: INFO/MainProcess] beat: Starting...
[2015-09-25 17:29:24,457: CRITICAL/MainProcess] beat raised exception <class 'EOFError'>: EOFError('Ran out of input',)
Traceback (most recent call last):
File "/home/user/staging/venv/lib/python3.4/site-packages/kombu/utils/__init__.py", line 320, in __get__
return obj.__dict__[self.__name__]
KeyError: 'scheduler'
During handling of the above exception, another exception …
Run Code Online (Sandbox Code Playgroud) 我收到错误
ImportError:无法导入名称OrderedDict
在我的ubuntu 14.04机器上本地安装服务器时。
我已经尝试过手动安装ordereddict,也尝试升级kombu,但它仍然给出错误。完整的追溯信息添加在下面-
rishav @ Swastik:〜/ open-event-orga-server $ sudo python create_db.py
追溯(最近一次调用):文件“ create_db.py”,第5行,
来自应用程序导入current_app
文件“ / home / rishav / open” 从celery导入-event-orga-server / app / init .py”,第23行,在
celery中导入Celery
文件“ /usr/local/lib/python2.7/dist-packages/celery/ init .py”,第133行,在
从celery导入五个#noqa文件“ /usr/local/lib/python2.7/dist-packages/celery/five.py”,第153行,
从kombu.utils.compat导入import OrderedDict#noqa
ImportError:无法导入名称OrderedDict
我正在使用kombu通过生产者/消费者模型管理RabbitMQ.我启动了我的生产者,它在队列中放置了100个作业(我只有一个队列和一个交换).我想同时启动多个消费者,让每个消费者一次处理一个工作.不幸的是,消费者互相阻挠(即,当一个消费者从队列中抓取一份工作时,其他消费者只是闲置着).如果我杀死了工作消费者,那么其他一个消费者就会开始工作并开始工作.有没有办法让所有消费者同时运行,每个消费者从队列中处理不同的工作?我的消费者代码如下:
def start_consumer(self, incoming_exchange_name):
if self.rabbitmq_connection.connected:
callbacks=[]
queues=[]
callbacks.append(self._callback)
queues.append(self.incoming_queue)
print 'opening a new *incoming* rabbitmq connection to the %s exchange for the %s queue' % (self.incoming_exchange.name, self.incoming_queue.name)
self.incoming_exchange(settings.rabbitmq_connection).declare()
self.incoming_queue(settings.rabbitmq_connection).declare()
with settings.rabbitmq_connection.Consumer(queues=queues, callbacks=callbacks) as consumer:
while True:
try:
self.rabbitmq_connection.drain_events()
except Exception as e:
print 'Error -> %s' % e.message
Run Code Online (Sandbox Code Playgroud) kombu ×7
celery ×3
python ×3
rabbitmq ×3
celerybeat ×2
amqp ×1
docker ×1
flask ×1
pika ×1
pip ×1
python-2.7 ×1
redis ×1
virtualenv ×1