标签: pika

RabbitMQ,Pika和重新连接策略

我正在使用Pika处理来自RabbitMQ的数据.由于我似乎遇到了不同类型的问题,我决定编写一个小型测试应用程序来查看如何处理断开连接.

我写了这个测试应用程序,其中包括:

  1. 连接到Broker,重试直到成功
  2. 连接时创建队列.
  3. 使用此队列并将结果放入python Queue.Queue(0)
  4. 从Queue.Queue(0)获取项目并将其生成回代理队列.

我注意到的是2个问题:

  1. 当我从一台连接到另一台主机上的rabbitmq的主机(在vm内)运行我的脚本时,这些脚本随机退出而不会产生错误.
  2. 当我在安装RabbitMQ的同一主机上运行我的脚本时,它运行正常并继续运行.

这可能是因为网络问题,数据包丢失,但我发现连接不是很强大.

当脚本在RabbitMQ服务器上本地运行并且我杀死RabbitMQ时,脚本退出时出现错误:"ERROR pika SelectConnection:3:104上的套接字错误"

所以看起来我不能让重新连接策略按原样运行.有人可以查看代码,看看我做错了什么?

谢谢,

松鸦

#!/bin/python
import logging
import threading
import Queue
import pika
from pika.reconnection_strategies import SimpleReconnectionStrategy
from pika.adapters import SelectConnection
import time
from threading import Lock

class Broker(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.logging = logging.getLogger(__name__)
        self.to_broker = Queue.Queue(0)
        self.from_broker = Queue.Queue(0)
        self.parameters = pika.ConnectionParameters(host='sandbox',heartbeat=True)
        self.srs = SimpleReconnectionStrategy()
        self.properties = pika.BasicProperties(delivery_mode=2)

        self.connection = None
        while True:
            try:
                self.connection = SelectConnection(self.parameters, self.on_connected,  reconnection_strategy=self.srs)
                break
            except Exception as err:
                self.logging.warning('Cant …
Run Code Online (Sandbox Code Playgroud)

python rabbitmq pika

15
推荐指数
1
解决办法
2万
查看次数

无法找到记录器"pika.adapters.blocking_connection"的处理程序

类似的问题似乎都是基于使用自定义记录器,我很高兴只使用默认/无.我的pika python应用程序运行并接收消息,但几秒钟后崩溃No handlers could be found for logger "pika.adapters.blocking_connection",任何想法?

import pika

credentials = pika.PlainCredentials('xxx_apphb.com', 'xxx')
parameters = pika.ConnectionParameters('bunny.cloudamqp.com', 5672, 'xxx_apphb.com', credentials)

connection = pika.BlockingConnection(parameters)
channel = connection.channel()

channel.queue_declare('messages')

def message_received(channel, method, properties, body):
    print "[x] Received %r" % (body)

channel.basic_consume(message_received, queue='messages', no_ack=True)

channel.start_consuming()
Run Code Online (Sandbox Code Playgroud)

通过添加修复:

import logging
logging.basicConfig()
Run Code Online (Sandbox Code Playgroud)

python amqp pika

15
推荐指数
1
解决办法
7725
查看次数

如何重新连接RabbitMQ?

我的python脚本一旦从另一个数据源收到消息,就不得不向RabbitMQ发送消息.python脚本发送它们的频率可以变化,例如1分钟到30分钟.

以下是我与RabbitMQ建立连接的方法:

  rabt_conn = pika.BlockingConnection(pika.ConnectionParameters("some_host"))
  channel = rbt_conn.channel()
Run Code Online (Sandbox Code Playgroud)

我刚才有例外

pika.exceptions.ConnectionClosed
Run Code Online (Sandbox Code Playgroud)

我该如何重新连接呢?什么是最好的方式?有没有"策略"?是否有能力发送ping以保持连接活动或设置超时?

任何指针将不胜感激.

python rabbitmq pika

13
推荐指数
2
解决办法
9363
查看次数

任何人都可以告诉我python中的pika和kombu消息库之间有什么区别?

我想在我的应用程序中使用消息库来与rabbitmq进行交互.谁能解释一下pika和kombu图书馆之间的区别?

amqp rabbitmq python-2.7 kombu pika

13
推荐指数
1
解决办法
3036
查看次数

有没有办法通过鼠兔在rabbitmq中列出队列?

我知道我们可以这样做以列出rabbitmq中的队列.

rabbitmqctl list_queues
Run Code Online (Sandbox Code Playgroud)

但我怎么能通过鼠兔做到这一点?

python queue rabbitmq pika

12
推荐指数
1
解决办法
6082
查看次数

RabbitMQ在处理长时间运行的任务时关闭连接,超时设置产生错误

我正在使用RabbitMQ生产者向消费者发送长时间运行的任务(30分钟+).问题是,当关闭服务器的连接并且未确认的任务被重新排队时,消费者仍在处理任务.

从研究中我了解到心跳增加的连接超时 可以用来解决这个问题.这两种解决方案在尝试时都会引发错误.在阅读类似帖子的答案时,我还了解到,自发布答案后,RabbitMQ已经实施了许多更改(例如,默认心跳超时已从RabbitMQ 3.5.5之前的580更改为60).

指定心跳和阻止的连接超时时:

credentials = pika.PlainCredentials('user', 'password')
parameters = pika.ConnectionParameters('XXX.XXX.XXX.XXX', port, '/', credentials, blocked_connection_timeout=2000)
connection = pika.BlockingConnection(parameters)

channel = connection.channel()
Run Code Online (Sandbox Code Playgroud)

显示以下错误:

TypeError: __init__() got an unexpected keyword argument 'blocked_connection_timeout'
Run Code Online (Sandbox Code Playgroud)

heartbeat_interval=1000在连接参数中指定时,会显示类似的错误:TypeError: __init__() got an unexpected keyword argument 'heartbeat_interval'

同样,socket_timeout = 1000显示以下错误:TypeError: __init__() got an unexpected keyword argument 'socket_timeout'

我在Ubuntu 14.04上运行RabbitMQ 3.6.1,pika 0.10.0和python 2.7.

  1. 为什么上述方法会产生错误?
  2. 在有长期连续任务的情况下,是否可以使用心跳方法?例如,在执行需要30分钟以上的大型数据库连接时,是否可以使用心跳?我赞成心跳方法,很多时候很难判断数据库连接等任务需要多长时间.

我已经阅读了类似问题的答案

更新:从pika文档中运行代码会产生相同的错误.

python amqp rabbitmq pika python-pika

11
推荐指数
1
解决办法
3457
查看次数

在Pika或RabbitMQ中,如何检查是否有消费者正在消费?

我想检查消费者/工人是否在场消费我即将发送的消息.

如果没有任何工人,我会启动一些工作人员(消费者和发布者都在一台机器上),然后继续发布消息.

如果有类似的函数connection.check_if_has_consumers,我会像这样实现它 -

import pika
import workers

# code for publishing to worker queue
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# if there are no consumers running (would be nice to have such a function)
if not connection.check_if_has_consumers(queue="worker_queue", exchange=""):
    # start the workers in other processes, using python's `multiprocessing`
    workers.start_workers()

# now, publish with no fear of your queues getting filled up
channel.queue_declare(queue="worker_queue", auto_delete=False, durable=True)
channel.basic_publish(exchange="", routing_key="worker_queue", body="rockin",
                            properties=pika.BasicProperties(delivery_mode=2)) …
Run Code Online (Sandbox Code Playgroud)

python rabbitmq pika

10
推荐指数
1
解决办法
7977
查看次数

Python和RabbitMQ - 从多个渠道收听消费事件的最佳方式?

我有两个独立的RabbitMQ实例.我正试图找到听取两者事件的最佳方式.

例如,我可以使用以下内容在一个上使用事件:

credentials = pika.PlainCredentials(user, pass)
connection = pika.BlockingConnection(pika.ConnectionParameters(host="host1", credentials=credentials))
channel = connection.channel()
result = channel.queue_declare(Exclusive=True)
self.channel.queue_bind(exchange="my-exchange", result.method.queue, routing_key='*.*.*.*.*')
channel.basic_consume(callback_func, result.method.queue, no_ack=True)
self.channel.start_consuming()
Run Code Online (Sandbox Code Playgroud)

我有第二个主持人,"host2",我也想听.我想创建两个单独的线程来做到这一点,但从我读过的,pika不是线程安全的.有没有更好的办法?或者创建两个单独的线程,每个线程监听不同的Rabbit实例(host1和host2)就足够了?

python rabbitmq pika

10
推荐指数
1
解决办法
2万
查看次数

使用鼠兔在RabbitMQ中同步和阻塞消耗

我想与阻塞同步使用队列(RabbitMQ).

注意:下面是准备好运行的完整代码.

系统设置使用RabbitMQ作为其排队系统,但我们的一个模块不需要异步消耗.

我尝试在BlockingConnection上使用basic_get ,它不会阻塞((None, None, None)立即返回):

# declare queue
get_connection().channel().queue_declare(TEST_QUEUE)
def blocking_get_1():

        channel = get_connection().channel()

        # get from an empty queue (prints immediately)
        print channel.basic_get(TEST_QUEUE)
Run Code Online (Sandbox Code Playgroud)

我也尝试使用消耗生成器,在长时间不消耗后,"连接已关闭"失败.

def blocking_get_2():
        channel = get_connection().channel()
        # put messages in TEST_QUEUE
        for i in range(4):
                channel.basic_publish(
                        '',
                        TEST_QUEUE,
                        'body %d' % i
                )
        consume_generator = channel.consume(TEST_QUEUE)
        print next(consume_generator)
        time.sleep(14400)
        print next(consume_generator)
Run Code Online (Sandbox Code Playgroud)

有没有办法像使用Queue.Queuepython 一样使用pika客户端使用RabbitMQ ?或类似的东西?

我现在的选择是忙等待(使用basic_get) - 但我宁愿使用现有系统来忙 - 等待,如果可能的话.

完整代码:

#!/usr/bin/env python
import pika
import time …
Run Code Online (Sandbox Code Playgroud)

python rabbitmq python-2.7 pika

9
推荐指数
1
解决办法
6445
查看次数

如何在有限的时间内通过鼠兔消费RabbitMQ消息?

pika教程中的所有示例都以客户端调用结束start_consuming(),这会启动无限循环.这些例子对我有用.

但是,我不希望我的客户端永远运行.相反,我需要我的客户端消耗消息一段时间,例如15分钟,然后停止.

我该如何做到这一点?

python rabbitmq python-2.7 pika

9
推荐指数
1
解决办法
4130
查看次数

标签 统计

pika ×10

python ×9

rabbitmq ×9

amqp ×3

python-2.7 ×3

kombu ×1

python-pika ×1

queue ×1