为什么RabbitMQ不会在持久队列上持久化消息?

hek*_*ran 11 python django rabbitmq celery

我通过Celery使用RabbitMQ和Django.我正在使用最基本的设置:

# RabbitMQ connection settings
BROKER_HOST = 'localhost'
BROKER_PORT = '5672'
BROKER_USER = 'guest'
BROKER_PASSWORD = 'guest'
BROKER_VHOST = '/'
Run Code Online (Sandbox Code Playgroud)

我导入了Celery任务并将其排队等一年后运行.来自iPython shell:

In [1]: from apps.test_app.tasks import add

In [2]: dt=datetime.datetime(2012, 2, 18, 10, 00)

In [3]: add.apply_async((10, 6), eta=dt)
DEBUG:amqplib:Start from server, version: 8.0, properties: {u'information': 'Licensed under the MPL.  See http://www.rabbitmq.com/', u'product': 'RabbitMQ', u'version': '2.2.0', u'copyright': 'Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.', u'platform': 'Erlang/OTP'}, mechanisms: ['PLAIN', 'AMQPLAIN'], locales: ['en_US']
DEBUG:amqplib:Open OK! known_hosts []
DEBUG:amqplib:using channel_id: 1
DEBUG:amqplib:Channel open
DEBUG:amqplib:Closed channel #1
Out[3]: <AsyncResult: cfc507a1-175f-438e-acea-8c989a120ab3>
Run Code Online (Sandbox Code Playgroud)

RabbitMQ在芹菜队列中收到此消息:

$  rabbitmqctl list_queues name messages durable
Listing queues ...
KTMacBook.local.celeryd.pidbox  0   false
celery  1   true
celeryctl_KTMacBook.local   0   true
...done.
Run Code Online (Sandbox Code Playgroud)

然后我通过点击control-C然后'a'中止来杀死RabbitMQ.当我再次启动服务器并使用rabbitmqctl进行检查时,它表示芹菜队列中没有消息:

$  rabbitmqctl list_queues name messages durable
Listing queues ...
celery  0   true
celeryctl_KTMacBook.local   0   true
...done.
Run Code Online (Sandbox Code Playgroud)

芹菜队列很耐用.为什么消息不会持续存在?我需要做些什么来使消息持久化?

Dan*_*man 20

使队列持久是与使其上的消息持久化不同的.持久的队列意味着当服务器重新启动时它们会自动再次出现 - 这在你的情况下显然已经发生了.但这不会影响消息本身.

要使消息持久化,您还必须将消息的delivery_mode属性标记为2.有关完整说明,请参阅经典写作Rabbits和Warrens.

编辑:完整链接已损坏,但截至2013年12月,您仍可以从主网址找到博文:http://blogs.digitar.com/jjww/


ask*_*sol 5

要查找delivery_mode可以使用它的消息并查看消息属性:

>>> from tasks import add
>>> add.delay(2, 2)

>>> from celery import current_app
>>> conn = current_app.broker_connection()
>>> consumer = current_app.amqp.get_task_consumer(conn)

>>> messages = []
>>> def callback(body, message):
...     messages.append(message)
>>> consumer.register_callback(callback)
>>> consumer.consume()

>>> conn.drain_events(timeout=1)

>>> messages[0].properties
>>> messages[0].properties
{'application_headers': {}, 'delivery_mode': 2, 'content_encoding': u'binary',    'content_type': u'application/x-python-serialize'}
Run Code Online (Sandbox Code Playgroud)

  • 我确认交付模式设置为2.我能够通过将RabbitMQ升级到2.3.1来实现它.使用RabbitMQ 2.2.0时,我遇到了持久性问题. (2认同)