Reu*_*ani 9 python rabbitmq python-2.7 pika
我想与阻塞同步使用队列(RabbitMQ).
注意:下面是准备好运行的完整代码.
系统设置使用RabbitMQ作为其排队系统,但我们的一个模块不需要异步消耗.
我尝试在BlockingConnection上使用basic_get ,它不会阻塞((None, None, None)立即返回):
# declare queue
get_connection().channel().queue_declare(TEST_QUEUE)
def blocking_get_1():
channel = get_connection().channel()
# get from an empty queue (prints immediately)
print channel.basic_get(TEST_QUEUE)
Run Code Online (Sandbox Code Playgroud)
我也尝试使用消耗生成器,在长时间不消耗后,"连接已关闭"失败.
def blocking_get_2():
channel = get_connection().channel()
# put messages in TEST_QUEUE
for i in range(4):
channel.basic_publish(
'',
TEST_QUEUE,
'body %d' % i
)
consume_generator = channel.consume(TEST_QUEUE)
print next(consume_generator)
time.sleep(14400)
print next(consume_generator)
Run Code Online (Sandbox Code Playgroud)
有没有办法像使用Queue.Queuepython 一样使用pika客户端使用RabbitMQ ?或类似的东西?
我现在的选择是忙等待(使用basic_get) - 但我宁愿使用现有系统来忙 - 等待,如果可能的话.
完整代码:
#!/usr/bin/env python
import pika
import time
TEST_QUEUE = 'test'
def get_connection():
# define connection
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=YOUR_IP,
port=YOUR_PORT,
credentials=pika.PlainCredentials(
username=YOUR_USER,
password=YOUR_PASSWORD,
)
)
)
return connection
# declare queue
get_connection().channel().queue_declare(TEST_QUEUE)
def blocking_get_1():
channel = get_connection().channel()
# get from an empty queue (prints immediately)
print channel.basic_get(TEST_QUEUE)
def blocking_get_2():
channel = get_connection().channel()
# put messages in TEST_QUEUE
for i in range(4):
channel.basic_publish(
'',
TEST_QUEUE,
'body %d' % i
)
consume_generator = channel.consume(TEST_QUEUE)
print next(consume_generator)
time.sleep(14400)
print next(consume_generator)
print "blocking_get_1"
blocking_get_1()
print "blocking_get_2"
blocking_get_2()
get_connection().channel().queue_delete(TEST_QUEUE)
Run Code Online (Sandbox Code Playgroud)
ean*_*son 12
Pika的一个常见问题是它目前没有在后台处理传入的事件.这基本上意味着在许多情况下,您需要connection.process_data_events()定期调用以确保它不会错过心跳.
这也意味着如果你长时间睡眠,pika将不会处理传入的数据,并最终死亡,因为它没有响应心跳.这里的一个选项是禁用心跳.
我通常通过在后台检查新事件来解决这个问题,如本例所示.
如果你想完全阻止我会做这样的事情(基于我自己的库AMQP-Storm).
while True:
result = channel.basic.get(queue='simple_queue', no_ack=False)
if result:
print("Message:", result['body'])
channel.basic.ack(result['method']['delivery_tag'])
else:
print("Channel Empty.")
sleep(1)
Run Code Online (Sandbox Code Playgroud)
这是基于此处的示例.
| 归档时间: |
|
| 查看次数: |
6445 次 |
| 最近记录: |