继Pika timed收到示例之后,我希望有一个客户端处理更多的并发请求.我的问题是,如果handle_delivery可能以某种方式被称为每次接收新邮件,而不是等待前一个handle_delivery回报?
我有一个使用Pika包(0.9.13)的Python客户端,并从RabbitMQ集群中的一个节点检索数据.群集由放置在两个不同主机(url_1和url_2)中的两个节点组成.如何让我的Python客户端订阅这两个节点?
这是我的代码的主要结构:
import pika
credentials = pika.PlainCredentials(user, password)
connection = pika.BlockingConnection(pika.ConnectionParameters(host=url_1,
credentials=credentials, ssl=ssl, port=port))
channel = connection.channel()
channel.exchange_declare(exchange=exchange.name,
type=exchange.type, durable=exchange.durable)
result = channel.queue_declare(queue=queue.name, exclusive=queue.exclusive,
durable=queue.durable, auto_delete=queue.autoDelete)
channel.queue_bind(exchange=exchange.name, queue=queue.name,
routing_key=binding_key)
channel.basic_consume(callback,
queue=queue.name,
no_ack=True)
channel.start_consuming()
Run Code Online (Sandbox Code Playgroud) 我有点困惑BlockingConnection和AsyncoreConnection.我想从Django应用程序向RabbitMQ队列发送一些消息.使用全局BlockingConnection对象可以吗?
谢谢.
我正在尝试将我的代码转换为通过 Pika 发送 rabbitmq 消息。我在理解如何使用异步连接(例如 SelectConnection)发送简单消息时遇到了很多麻烦。
在我使用 amqp 库的旧代码中,我只是声明了一个这样的类:
import amqp as amqp
class MQ():
mqConn = None
channel = None
def __init__(self):
self.connect()
def connect(self):
if self.mqConn is None:
self.mqConn = amqp.Connection(host="localhost", userid="dev", password="dev", virtual_host="/", insist=False)
self.channel = self.mqConn.channel()
elif not self.mqConn.connected:
self.mqConn = amqp.Connection(host="localhost", userid="dev", password="dev", virtual_host="/", insist=False)
self.channel = self.mqConn.channel()
def sendMQ(self, message):
self.connect()
lMessage = amqp.Message(message)
self.channel.basic_publish(lMessage, exchange="DevMatrixE", routing_key="dev_matrix_q")
Run Code Online (Sandbox Code Playgroud)
然后在我的代码中的其他地方调用 sendMQ("this is my message"),然后代码继续。我不需要听确认等。
有人可以使用 pika 和 SelectConnection 编写一个简单的类,它也可以使用 sendMQ(“这是我的消息”)发送消息吗?我看过 pika 的例子,但我不知道如何绕过 ioloop 和 KeyboardInterrupt。我想我只是不确定如何让我的代码在没有所有这些 try/excepts …
只想知道worker.py文件中的参数名称含义:
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
Run Code Online (Sandbox Code Playgroud)
ch,方法,属性意味着什么?
当使用 Delivery_mode=1 发布消息(到持久队列)时,RabbitMQ 管理面板每秒显示大约 2500 条消息,但不幸的是,当我尝试使用 Delivery_mode=2 发布相同的消息以使它们持久化时,传入消息速率下降到 15-20每秒消息数。(使用Python 2.7和pika库)
每条消息都包含一个 URL,因此它们的大小也很小...而且我使用 basic_publish() 进行发布。
管理面板上的节点统计显示以下数据;
文件描述符:55 / 1024 个可用
套接字描述符:1 / 829 可用
Erlang 进程:248 / 1048576 可用
内存:277MB/12GB高水位
磁盘空间:1.6TB/48MB低水位线
另外,I/O 统计信息(每次操作的 I/O 平均时间)如下所示;读取:3.0ms
写入:0.13ms
寻道:0.05 毫秒
同步:70ms
这是 RabbitMQ 上持久队列的正常行为还是我做错了什么或者我可以做一些调整性能的事情?
提前非常感谢您...
使用pika库BlockingConnection连接到RabbitMQ,我偶尔会在发布消息时出错:
致命套接字错误:错误(32,'破管')
这是一个非常简单的子进程,它从内存中的队列中获取一些信息,并将一个小的JSON消息发送到AMQP.当系统几分钟没有发送任何消息时,似乎只出现错误.
建立:
connection = pika.BlockingConnection(parameters)
channel = self.connection.channel()
channel.exchange_declare(
exchange='xyz',
exchange_type='fanout',
passive=False,
durable=True,
auto_delete=False
)
Run Code Online (Sandbox Code Playgroud)
入队代码捕获任何连接错误并重试:
def _enqueue(self, message_id, data):
try:
published = self.channel.basic_publish(
self.amqp_exchange,
self.amqp_routing_key,
json.dumps(data),
pika.BasicProperties(
content_type="application/json",
delivery_mode=2,
message_id=message_id
)
)
# Confirm delivery or retry
if published:
self.retry_count = 0
else:
raise EnqueueException("Message publish not confirmed.")
except (EnqueueException, pika.exceptions.AMQPChannelError, pika.exceptions.AMQPConnectionError,
pika.exceptions.ChannelClosed, pika.exceptions.ConnectionClosed, pika.exceptions.UnexpectedFrameError,
pika.exceptions.UnroutableError, socket.timeout) as e:
self.retry_count += 1
if self.retry_count < 5:
logging.warning("Reconnecting and resending")
if self.connection.is_open:
self.connection.close()
self.connect()
self._enqueue(message_id, data) …Run Code Online (Sandbox Code Playgroud) 我有一个rabbitmq 服务器,并使用pika 库和Python 来生成/使用消息。出于开发目的,我只是使用
credentials = pika.PlainCredentials(<user-name>, <password>)
我想将其更改为使用 pika.ExternalCredentials 或 TLS。
我已将rabbitmq 服务器设置为在端口 5671 上侦听 TLS,并已正确配置它。我能够从本地主机与rabbitmq进行通信,但是当我尝试从本地主机外部与它进行通信时,它不喜欢这样。我有一种感觉,我的“凭据”是基于rabbitmq中的“来宾”用户。
%% -*- mode: erlang -*-
[
{rabbit,
[
{ssl_listeners, [5671]},
{auth_mechanisms, ['PLAIN', 'AMQPLAIN', 'EXTERNAL']},
{ssl_options, [{cacertfile,"~/tls-gen/basic/result/ca_certificate.pem"},
{certfile,"~/tls-gen/basic/result/server_certificate.pem"},
{keyfile,"~/tls-gen/basic/result/server_key.pem"},
{verify,verify_none},
{ssl_cert_login_from, common_name},
{fail_if_no_peer_cert,false}]}
]}
].
Run Code Online (Sandbox Code Playgroud)
我可以确认这是有效的,因为在我的rabbitmq日志中我看到:
2019-08-21 15:34:47.663 [info] <0.442.0> started TLS (SSL) listener on [::]:5671
Run Code Online (Sandbox Code Playgroud)
服务器端的一切似乎都已设置完毕,我还生成了证书和所需的所有 .pem 文件。
import pika
import ssl
from pika.credentials import ExternalCredentials
context = ssl.create_default_context(cafile="~/tls-gen/basic/result/ca_certificate.pem")
context.load_cert_chain("~/tls-gen/basic/result/client_certificate.pem",
"~/tls-gen/basic/result/client_key.pem")
ssl_options = pika.SSLOptions(context, "10.154.0.27")
params = pika.ConnectionParameters(port=5671,ssl_options=ssl_options, credentials = ExternalCredentials()) …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用 docker-compose 从 docker 内部连接到rabbitMQ。从容器外部运行的程序连接到“image:rabbitmq:3-management”没有问题。
没有设置密码或任何东西。
仔细查看错误消息后,鼠兔似乎尝试使用 IPv6 而不是 IPv4 连接。查看文档我无法找到连接 IPv4 的方法。
故障发生于
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
Error log
rabbitMQ_1 | 2020-12-09 12:41:42.332 [info] <0.685.0> Ready to start client connection listeners
rabbitMQ_1 | 2020-12-09 12:41:42.333 [info] <0.988.0> started TCP listener on [::]:5672
listener_1 | ERROR:pika.adapters.utils.io_services_utils:Socket failed to connect: <socket.socket fd=9, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 36782)>; error=111 (Connection refused)
listener_1 | ERROR:pika.adapters.utils.connection_workflow:TCP Connection attempt failed: ConnectionRefusedError(111, 'Connection refused'); dest=(<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_STREAM: 1>, 6, '', ('127.0.0.1', 5672))
listener_1 | ERROR:pika.adapters.utils.connection_workflow:AMQPConnector - reporting …Run Code Online (Sandbox Code Playgroud) 我有以下代码,我在其中初始化侦听队列的消费者。
consumer = MyConsumer()
consumer.declare_queue(queue_name="my-jobs")
consumer.declare_exchange(exchange_name="my-jobs")
consumer.bind_queue(
exchange_name="my-jobs", queue_name="my-jobs", routing_key="jobs"
)
consumer.consume_messages(queue="my-jobs", callback=consumer.consume)
Run Code Online (Sandbox Code Playgroud)
问题是consum方法定义如下:
async def consume(self, channel, method, properties, body):
Run Code Online (Sandbox Code Playgroud)
在 Consumer 方法中,我们需要等待异步函数,但这会为 Consumer 函数产生错误“coroutine is not waiting”。有没有办法在 pika 中使用异步函数作为回调?