标签: pika

Pika RabbitMQ客户端的异步消息处理

Pika timed收到示例之后,我希望有一个客户端处理更多的并发请求.我的问题是,如果handle_delivery可能以某种方式被称为每次接收新邮件,而不是等待前一个handle_delivery回报?

python rabbitmq pika

6
推荐指数
1
解决办法
3582
查看次数

如何使用pika与Python客户端连接到RabbitMQ集群?

我有一个使用Pika包(0.9.13)的Python客户端,并从RabbitMQ集群中的一个节点检索数据.群集由放置在两个不同主机(url_1和url_2)中的两个节点组成.如何让我的Python客户端订阅这两个节点?

这是我的代码的主要结构:

import pika
credentials = pika.PlainCredentials(user, password)
connection = pika.BlockingConnection(pika.ConnectionParameters(host=url_1,
                                     credentials=credentials, ssl=ssl, port=port))
channel = connection.channel() 
channel.exchange_declare(exchange=exchange.name, 
                         type=exchange.type, durable=exchange.durable)

result = channel.queue_declare(queue=queue.name, exclusive=queue.exclusive, 
                             durable=queue.durable, auto_delete=queue.autoDelete)
channel.queue_bind(exchange=exchange.name, queue=queue.name, 
                   routing_key=binding_key)
channel.basic_consume(callback,
                  queue=queue.name,
                  no_ack=True)

channel.start_consuming()
Run Code Online (Sandbox Code Playgroud)

python cluster-computing rabbitmq pika

6
推荐指数
1
解决办法
5054
查看次数

在网络应用程序中使用Pika BlockingConnection是否可以?

我有点困惑BlockingConnectionAsyncoreConnection.我想从Django应用程序向RabbitMQ队列发送一些消息.使用全局BlockingConnection对象可以吗?

谢谢.

python django rabbitmq pika blockingcollection

6
推荐指数
1
解决办法
1217
查看次数

如何在python中做一个简单的Pika SelectConnection来发送消息?

我正在尝试将我的代码转换为通过 Pika 发送 rabbitmq 消息。我在理解如何使用异步连接(例如 SelectConnection)发送简单消息时遇到了很多麻烦。

在我使用 amqp 库的旧代码中,我只是声明了一个这样的类:

import amqp as amqp

class MQ():

    mqConn = None
    channel = None

    def __init__(self):
        self.connect()

    def connect(self):
        if self.mqConn is None:
            self.mqConn = amqp.Connection(host="localhost", userid="dev", password="dev", virtual_host="/", insist=False)
            self.channel = self.mqConn.channel()

        elif not self.mqConn.connected:
            self.mqConn = amqp.Connection(host="localhost", userid="dev", password="dev", virtual_host="/", insist=False)
            self.channel = self.mqConn.channel()

    def sendMQ(self, message):
        self.connect()
        lMessage = amqp.Message(message)
        self.channel.basic_publish(lMessage, exchange="DevMatrixE", routing_key="dev_matrix_q") 
Run Code Online (Sandbox Code Playgroud)

然后在我的代码中的其他地方调用 sendMQ("this is my message"),然后代码继续。我不需要听确认等。

有人可以使用 pika 和 SelectConnection 编写一个简单的类,它也可以使用 sendMQ(“这是我的消息”)发送消息吗?我看过 pika 的例子,但我不知道如何绕过 ioloop 和 KeyboardInterrupt。我想我只是不确定如何让我的代码在没有所有这些 try/excepts …

python rabbitmq pika

6
推荐指数
2
解决办法
3807
查看次数

RabbitMQ def回调(通道,方法,属性,主体)

只想知道worker.py文件中的参数名称含义:

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
Run Code Online (Sandbox Code Playgroud)

ch,方法,属性意味着什么?

rabbitmq pika python-pika

6
推荐指数
1
解决办法
1820
查看次数

为什么用 RabbitMQ 发布持久消息这么慢?

当使用 Delivery_mode=1 发布消息(到持久队列)时,RabbitMQ 管理面板每秒显示大约 2500 条消息,但不幸的是,当我尝试使用 Delivery_mode=2 发布相同的消息以使它们持久化时,传入消息速率下降到 15-20每秒消息数。(使用Python 2.7和pika库)

每条消息都包含一个 URL,因此它们的大小也很小...而且我使用 basic_publish() 进行发布。

管理面板上的节点统计显示以下数据;

文件描述符:55 / 1024 个可用

套接字描述符:1 / 829 可用

Erlang 进程:248 / 1048576 可用

内存:277MB/12GB高水位

磁盘空间:1.6TB/48MB低水位线

另外,I/O 统计信息(每次操作的 I/O 平均时间)如下所示;读取:3.0ms

写入:0.13ms

寻道:0.05 毫秒

同步:70ms

这是 RabbitMQ 上持久队列的正常行为还是我做错了什么或者我可以做一些调整性能的事情?

提前非常感谢您...

python amqp rabbitmq pika

6
推荐指数
0
解决办法
2663
查看次数

RabbitMQ破坏了管道错误或丢失了消息

使用pika库BlockingConnection连接到RabbitMQ,我偶尔会在发布消息时出错:

致命套接字错误:错误(32,'破管')

这是一个非常简单的子进程,它从内存中的队列中获取一些信息,并将一个小的JSON消息发送到AMQP.当系统几分钟没有发送任何消息时,似乎只出现错误.

建立:

connection = pika.BlockingConnection(parameters)
channel = self.connection.channel()
channel.exchange_declare(
    exchange='xyz',
    exchange_type='fanout',
    passive=False,
    durable=True,
    auto_delete=False
)
Run Code Online (Sandbox Code Playgroud)

入队代码捕获任何连接错误并重试:

def _enqueue(self, message_id, data):
    try:
        published = self.channel.basic_publish(
            self.amqp_exchange,
            self.amqp_routing_key,
            json.dumps(data),
            pika.BasicProperties(
                content_type="application/json",
                delivery_mode=2,
                message_id=message_id
            )
        )

        # Confirm delivery or retry
        if published:
            self.retry_count = 0
        else:
            raise EnqueueException("Message publish not confirmed.")

    except (EnqueueException, pika.exceptions.AMQPChannelError, pika.exceptions.AMQPConnectionError,
            pika.exceptions.ChannelClosed, pika.exceptions.ConnectionClosed, pika.exceptions.UnexpectedFrameError,
            pika.exceptions.UnroutableError, socket.timeout) as e:
        self.retry_count += 1
        if self.retry_count < 5:
            logging.warning("Reconnecting and resending")
            if self.connection.is_open:
                self.connection.close()
            self.connect()
            self._enqueue(message_id, data) …
Run Code Online (Sandbox Code Playgroud)

python amqp rabbitmq pika

6
推荐指数
1
解决办法
3854
查看次数

使用ExternalCredentials验证rabbitmq

我有一个rabbitmq 服务器,并使用pika 库和Python 来生成/使用消息。出于开发目的,我只是使用

credentials = pika.PlainCredentials(<user-name>, <password>)

我想将其更改为使用 pika.ExternalCredentials 或 TLS。

我已将rabbitmq 服务器设置为在端口 5671 上侦听 TLS,并已正确配置它。我能够从本地主机与rabbitmq进行通信,但是当我尝试从本地主机外部与它进行通信时,它不喜欢这样。我有一种感觉,我的“凭据”是基于rabbitmq中的“来宾”用户。

rabbitmq.config

%% -*- mode: erlang -*-

[
 {rabbit,
  [
   {ssl_listeners, [5671]},
   {auth_mechanisms, ['PLAIN', 'AMQPLAIN', 'EXTERNAL']},
   {ssl_options, [{cacertfile,"~/tls-gen/basic/result/ca_certificate.pem"},
                  {certfile,"~/tls-gen/basic/result/server_certificate.pem"},
                  {keyfile,"~/tls-gen/basic/result/server_key.pem"},
                  {verify,verify_none},
                  {ssl_cert_login_from, common_name},
                  {fail_if_no_peer_cert,false}]}
   
  ]}
].
Run Code Online (Sandbox Code Playgroud)

我可以确认这是有效的,因为在我的rabbitmq日志中我看到:

2019-08-21 15:34:47.663 [info] <0.442.0> started TLS (SSL) listener on [::]:5671
Run Code Online (Sandbox Code Playgroud)

服务器端的一切似乎都已设置完毕,我还生成了证书和所需的所有 .pem 文件。

test_rabbitmq.py

import pika
import ssl
from pika.credentials import ExternalCredentials

context = ssl.create_default_context(cafile="~/tls-gen/basic/result/ca_certificate.pem")
context.load_cert_chain("~/tls-gen/basic/result/client_certificate.pem",
                            "~/tls-gen/basic/result/client_key.pem")
ssl_options = pika.SSLOptions(context, "10.154.0.27")
params = pika.ConnectionParameters(port=5671,ssl_options=ssl_options, credentials = ExternalCredentials()) …
Run Code Online (Sandbox Code Playgroud)

python ssl rabbitmq pika tls1.2

6
推荐指数
1
解决办法
6527
查看次数

Docker、rabbitMQ 和 pike 连接被拒绝

我正在尝试使用 docker-compose 从 docker 内部连接到rabbitMQ。从容器外部运行的程序连接到“image:rabbitmq:3-management”没有问题。

没有设置密码或任何东西。

仔细查看错误消息后,鼠兔似乎尝试使用 IPv6 而不是 IPv4 连接。查看文档我无法找到连接 IPv4 的方法。

故障发生于

self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

Error log
rabbitMQ_1  | 2020-12-09 12:41:42.332 [info] <0.685.0> Ready to start client connection listeners
rabbitMQ_1  | 2020-12-09 12:41:42.333 [info] <0.988.0> started TCP listener on [::]:5672
listener_1  | ERROR:pika.adapters.utils.io_services_utils:Socket failed to connect: <socket.socket fd=9, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 36782)>; error=111 (Connection refused)
listener_1  | ERROR:pika.adapters.utils.connection_workflow:TCP Connection attempt failed: ConnectionRefusedError(111, 'Connection refused'); dest=(<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_STREAM: 1>, 6, '', ('127.0.0.1', 5672))
listener_1  | ERROR:pika.adapters.utils.connection_workflow:AMQPConnector - reporting …
Run Code Online (Sandbox Code Playgroud)

rabbitmq pika docker

6
推荐指数
1
解决办法
5090
查看次数

Python - RabbitMQ Pika 消费者 - 如何使用异步函数作为回调

我有以下代码,我在其中初始化侦听队列的消费者。

consumer = MyConsumer()
consumer.declare_queue(queue_name="my-jobs")
consumer.declare_exchange(exchange_name="my-jobs")
consumer.bind_queue(
    exchange_name="my-jobs", queue_name="my-jobs", routing_key="jobs"
)
consumer.consume_messages(queue="my-jobs", callback=consumer.consume)
Run Code Online (Sandbox Code Playgroud)

问题是consum方法定义如下:

async def consume(self, channel, method, properties, body):
Run Code Online (Sandbox Code Playgroud)

在 Consumer 方法中,我们需要等待异步函数,但这会为 Consumer 函数产生错误“coroutine is not waiting”。有没有办法在 pika 中使用异步函数作为回调?

python rabbitmq pika

6
推荐指数
1
解决办法
4898
查看次数