我正在使用Pika处理来自RabbitMQ的数据.由于我似乎遇到了不同类型的问题,我决定编写一个小型测试应用程序来查看如何处理断开连接.
我写了这个测试应用程序,其中包括:
我注意到的是2个问题:
这可能是因为网络问题,数据包丢失,但我发现连接不是很强大.
当脚本在RabbitMQ服务器上本地运行并且我杀死RabbitMQ时,脚本退出时出现错误:"ERROR pika SelectConnection:3:104上的套接字错误"
所以看起来我不能让重新连接策略按原样运行.有人可以查看代码,看看我做错了什么?
谢谢,
松鸦
#!/bin/python
import logging
import threading
import Queue
import pika
from pika.reconnection_strategies import SimpleReconnectionStrategy
from pika.adapters import SelectConnection
import time
from threading import Lock
class Broker(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.logging = logging.getLogger(__name__)
self.to_broker = Queue.Queue(0)
self.from_broker = Queue.Queue(0)
self.parameters = pika.ConnectionParameters(host='sandbox',heartbeat=True)
self.srs = SimpleReconnectionStrategy()
self.properties = pika.BasicProperties(delivery_mode=2)
self.connection = None
while True:
try:
self.connection = SelectConnection(self.parameters, self.on_connected, reconnection_strategy=self.srs)
break
except Exception as err:
self.logging.warning('Cant …Run Code Online (Sandbox Code Playgroud) 类似的问题似乎都是基于使用自定义记录器,我很高兴只使用默认/无.我的pika python应用程序运行并接收消息,但几秒钟后崩溃No handlers could be found for logger "pika.adapters.blocking_connection",任何想法?
import pika
credentials = pika.PlainCredentials('xxx_apphb.com', 'xxx')
parameters = pika.ConnectionParameters('bunny.cloudamqp.com', 5672, 'xxx_apphb.com', credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare('messages')
def message_received(channel, method, properties, body):
print "[x] Received %r" % (body)
channel.basic_consume(message_received, queue='messages', no_ack=True)
channel.start_consuming()
Run Code Online (Sandbox Code Playgroud)
通过添加修复:
import logging
logging.basicConfig()
Run Code Online (Sandbox Code Playgroud) 我的python脚本一旦从另一个数据源收到消息,就不得不向RabbitMQ发送消息.python脚本发送它们的频率可以变化,例如1分钟到30分钟.
以下是我与RabbitMQ建立连接的方法:
rabt_conn = pika.BlockingConnection(pika.ConnectionParameters("some_host"))
channel = rbt_conn.channel()
Run Code Online (Sandbox Code Playgroud)
我刚才有例外
pika.exceptions.ConnectionClosed
Run Code Online (Sandbox Code Playgroud)
我该如何重新连接呢?什么是最好的方式?有没有"策略"?是否有能力发送ping以保持连接活动或设置超时?
任何指针将不胜感激.
我想在我的应用程序中使用消息库来与rabbitmq进行交互.谁能解释一下pika和kombu图书馆之间的区别?
我知道我们可以这样做以列出rabbitmq中的队列.
rabbitmqctl list_queues
Run Code Online (Sandbox Code Playgroud)
但我怎么能通过鼠兔做到这一点?
我正在使用RabbitMQ生产者向消费者发送长时间运行的任务(30分钟+).问题是,当关闭服务器的连接并且未确认的任务被重新排队时,消费者仍在处理任务.
从研究中我了解到心跳或增加的连接超时 可以用来解决这个问题.这两种解决方案在尝试时都会引发错误.在阅读类似帖子的答案时,我还了解到,自发布答案后,RabbitMQ已经实施了许多更改(例如,默认心跳超时已从RabbitMQ 3.5.5之前的580更改为60).
指定心跳和阻止的连接超时时:
credentials = pika.PlainCredentials('user', 'password')
parameters = pika.ConnectionParameters('XXX.XXX.XXX.XXX', port, '/', credentials, blocked_connection_timeout=2000)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
Run Code Online (Sandbox Code Playgroud)
显示以下错误:
TypeError: __init__() got an unexpected keyword argument 'blocked_connection_timeout'
Run Code Online (Sandbox Code Playgroud)
heartbeat_interval=1000在连接参数中指定时,会显示类似的错误:TypeError: __init__() got an unexpected keyword argument 'heartbeat_interval'
同样,socket_timeout = 1000显示以下错误:TypeError: __init__() got an unexpected keyword argument 'socket_timeout'
我在Ubuntu 14.04上运行RabbitMQ 3.6.1,pika 0.10.0和python 2.7.
我已经阅读了类似问题的答案
我想检查消费者/工人是否在场消费我即将发送的消息.
如果没有任何工人,我会启动一些工作人员(消费者和发布者都在一台机器上),然后继续发布消息.
如果有类似的函数connection.check_if_has_consumers,我会像这样实现它 -
import pika
import workers
# code for publishing to worker queue
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# if there are no consumers running (would be nice to have such a function)
if not connection.check_if_has_consumers(queue="worker_queue", exchange=""):
# start the workers in other processes, using python's `multiprocessing`
workers.start_workers()
# now, publish with no fear of your queues getting filled up
channel.queue_declare(queue="worker_queue", auto_delete=False, durable=True)
channel.basic_publish(exchange="", routing_key="worker_queue", body="rockin",
properties=pika.BasicProperties(delivery_mode=2)) …Run Code Online (Sandbox Code Playgroud) 我有两个独立的RabbitMQ实例.我正试图找到听取两者事件的最佳方式.
例如,我可以使用以下内容在一个上使用事件:
credentials = pika.PlainCredentials(user, pass)
connection = pika.BlockingConnection(pika.ConnectionParameters(host="host1", credentials=credentials))
channel = connection.channel()
result = channel.queue_declare(Exclusive=True)
self.channel.queue_bind(exchange="my-exchange", result.method.queue, routing_key='*.*.*.*.*')
channel.basic_consume(callback_func, result.method.queue, no_ack=True)
self.channel.start_consuming()
Run Code Online (Sandbox Code Playgroud)
我有第二个主持人,"host2",我也想听.我想创建两个单独的线程来做到这一点,但从我读过的,pika不是线程安全的.有没有更好的办法?或者创建两个单独的线程,每个线程监听不同的Rabbit实例(host1和host2)就足够了?
我想与阻塞同步使用队列(RabbitMQ).
注意:下面是准备好运行的完整代码.
系统设置使用RabbitMQ作为其排队系统,但我们的一个模块不需要异步消耗.
我尝试在BlockingConnection上使用basic_get ,它不会阻塞((None, None, None)立即返回):
# declare queue
get_connection().channel().queue_declare(TEST_QUEUE)
def blocking_get_1():
channel = get_connection().channel()
# get from an empty queue (prints immediately)
print channel.basic_get(TEST_QUEUE)
Run Code Online (Sandbox Code Playgroud)
我也尝试使用消耗生成器,在长时间不消耗后,"连接已关闭"失败.
def blocking_get_2():
channel = get_connection().channel()
# put messages in TEST_QUEUE
for i in range(4):
channel.basic_publish(
'',
TEST_QUEUE,
'body %d' % i
)
consume_generator = channel.consume(TEST_QUEUE)
print next(consume_generator)
time.sleep(14400)
print next(consume_generator)
Run Code Online (Sandbox Code Playgroud)
有没有办法像使用Queue.Queuepython 一样使用pika客户端使用RabbitMQ ?或类似的东西?
我现在的选择是忙等待(使用basic_get) - 但我宁愿使用现有系统来忙 - 等待,如果可能的话.
完整代码:
#!/usr/bin/env python
import pika
import time …Run Code Online (Sandbox Code Playgroud) pika教程中的所有示例都以客户端调用结束start_consuming(),这会启动无限循环.这些例子对我有用.
但是,我不希望我的客户端永远运行.相反,我需要我的客户端消耗消息一段时间,例如15分钟,然后停止.
我该如何做到这一点?