使用kombu重试发布消息的最佳方法是什么?

Fél*_*lix 6 python kombu

我正在测试kombu的工作原理.我打算在几个项目中取代鼠兔.我看到kombu有很多文档,但使用我在文档中找到的一些消息丢失了.这是代码:

from kombu import Connection, Producer
conn = Connection('amqp://localhost:5672')
def errback(exc, interval):
     logger.error('Error: %r', exc, exc_info=1)
     logger.info('Retry in %s seconds.', interval)
producer = Producer(conn)
publish = conn.ensure(producer, producer.publish, errback=errback, max_retries=3)
for i in range(1, 200000):
   publish({'hello': 'world'}, routing_key='test_queue')
   time.sleep(0.001)
Run Code Online (Sandbox Code Playgroud)

当它发布时我多次关闭连接并继续发布但在队列中有大约60000条消息,所以有很多丢失的消息.

我尝试了不同的替代品,例如:

publish({'hello': 'world'}, retry=True, mandatory=True, routing_key='hipri')
Run Code Online (Sandbox Code Playgroud)

谢谢!

Fél*_*lix 10

问题是默认Kombu不使用'确认',你必须使用:

        conn = Connection('amqp://localhost:5672', transport_options={'confirm_publish': True})
Run Code Online (Sandbox Code Playgroud)

谢谢

  • 这与 Pika 中的强制标志不同。如果 RabbitMQ 服务器上没有声明队列,则消息将被发送,并且不会被路由到任何队列,因此将被“忽略”。生产者不会收到任何消息未路由的信号。 (2认同)