如何在python中做一个简单的Pika SelectConnection来发送消息?

The*_*ear 6 python rabbitmq pika

我正在尝试将我的代码转换为通过 Pika 发送 rabbitmq 消息。我在理解如何使用异步连接(例如 SelectConnection)发送简单消息时遇到了很多麻烦。

在我使用 amqp 库的旧代码中,我只是声明了一个这样的类:

import amqp as amqp

class MQ():

    mqConn = None
    channel = None

    def __init__(self):
        self.connect()

    def connect(self):
        if self.mqConn is None:
            self.mqConn = amqp.Connection(host="localhost", userid="dev", password="dev", virtual_host="/", insist=False)
            self.channel = self.mqConn.channel()

        elif not self.mqConn.connected:
            self.mqConn = amqp.Connection(host="localhost", userid="dev", password="dev", virtual_host="/", insist=False)
            self.channel = self.mqConn.channel()

    def sendMQ(self, message):
        self.connect()
        lMessage = amqp.Message(message)
        self.channel.basic_publish(lMessage, exchange="DevMatrixE", routing_key="dev_matrix_q") 
Run Code Online (Sandbox Code Playgroud)

然后在我的代码中的其他地方调用 sendMQ("this is my message"),然后代码继续。我不需要听确认等。

有人可以使用 pika 和 SelectConnection 编写一个简单的类,它也可以使用 sendMQ(“这是我的消息”)发送消息吗?我看过 pika 的例子,但我不知道如何绕过 ioloop 和 KeyboardInterrupt。我想我只是不确定如何让我的代码在没有所有这些 try/excepts 的情况下继续运行......此外,我不确定如何通过所有回调传递我的消息......

任何帮助表示赞赏!

谢谢。

Ter*_*Sun 5

整个事情都是回调驱动的,因为它是一种异步的做事方式。异步消费者很容易理解,我们可以通过提供回调函数来获取消息。然而,至少对于初学者来说,发布者部分有点难以理解。

通常我们需要一个队列来进行通信,发布者定期从中获取数据。

使用 SelectConnection 的关键是将您的发布消息函数注册到事件循环中,这可以通过connection.add_timeout. 完成发布后,注册下一轮发布。

下一个问题是在哪里放置初始注册。初始注册可以在通道打开回调中完成。

下面是一个代码片段,以便更好地理解。请注意,它不是生产就绪的。因为它只以每秒 10 条的最大速度发布消息。您需要调整发布间隔并在一次回调中发布更多消息。

class MQ(Object):
    def __init___(self, queue):
        self.queue = queue
    def on_channel_open(self, chn):
        self.channel = chn
        self.connection.add_timeout(0.1, self.schedule_next_message)
    def schedule_next_message(self):
        try:
            msg = self.queue.get(True, 0.01)
            self.channel.basic_publish('YOUR EXCHANGE','YOUR ROUTING KEY',msg)
        except Queue.Empty:
            pass
        self.connection.add_timeout(0.1, self.schedule_next_message)
    def on_open(self, conn):
        self.connection = conn
        self.connection.channel(on_open_callback=self.on_channel_open)
    def run(self):
        # create a connection
        self.connection = pika.SelectConnection(pika.ConnectionParameters(heartbeat=600,host=args.mq_ip),self.on_open)
        try:
            self.connection.ioloop.start()
        except Exception:
            print("exception in publisher")
            self.connection.close()
            self.connection.ioloop.start()
Run Code Online (Sandbox Code Playgroud)

将 MQ(queue).run() 放在一个单独的线程中,每当你想把消息放入 mq 时,只需将它放入队列对象中即可。


ede*_*ora -4

作为第一种方法,我建议您从帖子末尾提供的发布/订阅示例开始。一旦您理解了这个简单的示例,就可以开始按照最后的代码块之前提供的教程进行操作。该教程有 6 个不同的用例及其 Python 示例。通过前 5 个步骤,您将了解其工作原理。您应该清楚交换器(将消息路由到每个队列的实体)、绑定密钥(用于连接交换器和队列的密钥)、路由密钥(与发布者和消息一起发送的密钥)的概念。交换器使用它来将消息路由到一个队列或另一个队列)和队列(可以存储消息的缓冲区,可以有超过 1 个(如果需要的话,可以有 1 个)订阅者,并且可以从超过 1 个交换器获取消息,并且基于不同的交换器绑定键)。此外,交换的类型不止一种(扇出、主题(这可能就是您需要的)......)。

如果这一切听起来很新鲜,请按照 RabbitMQ 提供的教程进行操作:

https://www.rabbitmq.com/tutorials/tutorial-one-python.html

pub.py:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='',
                  routing_key='hello',
                  body='Hello World!')
print " [x] Sent 'Hello World!'"
connection.close()
Run Code Online (Sandbox Code Playgroud)

子.py:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)

channel.basic_consume(callback,
                  queue='hello',
                  no_ack=True)

channel.start_consuming()
Run Code Online (Sandbox Code Playgroud)

  • 很高兴您能提供帮助,但他正在询问有关 SelectConnection 适配器的信息。您提到的示例正在使用 BlockingConnection。 (4认同)