The*_*ear 6 python rabbitmq pika
我正在尝试将我的代码转换为通过 Pika 发送 rabbitmq 消息。我在理解如何使用异步连接(例如 SelectConnection)发送简单消息时遇到了很多麻烦。
在我使用 amqp 库的旧代码中,我只是声明了一个这样的类:
import amqp as amqp
class MQ():
mqConn = None
channel = None
def __init__(self):
self.connect()
def connect(self):
if self.mqConn is None:
self.mqConn = amqp.Connection(host="localhost", userid="dev", password="dev", virtual_host="/", insist=False)
self.channel = self.mqConn.channel()
elif not self.mqConn.connected:
self.mqConn = amqp.Connection(host="localhost", userid="dev", password="dev", virtual_host="/", insist=False)
self.channel = self.mqConn.channel()
def sendMQ(self, message):
self.connect()
lMessage = amqp.Message(message)
self.channel.basic_publish(lMessage, exchange="DevMatrixE", routing_key="dev_matrix_q")
Run Code Online (Sandbox Code Playgroud)
然后在我的代码中的其他地方调用 sendMQ("this is my message"),然后代码继续。我不需要听确认等。
有人可以使用 pika 和 SelectConnection 编写一个简单的类,它也可以使用 sendMQ(“这是我的消息”)发送消息吗?我看过 pika 的例子,但我不知道如何绕过 ioloop 和 KeyboardInterrupt。我想我只是不确定如何让我的代码在没有所有这些 try/excepts 的情况下继续运行......此外,我不确定如何通过所有回调传递我的消息......
任何帮助表示赞赏!
谢谢。
整个事情都是回调驱动的,因为它是一种异步的做事方式。异步消费者很容易理解,我们可以通过提供回调函数来获取消息。然而,至少对于初学者来说,发布者部分有点难以理解。
通常我们需要一个队列来进行通信,发布者定期从中获取数据。
使用 SelectConnection 的关键是将您的发布消息函数注册到事件循环中,这可以通过connection.add_timeout
. 完成发布后,注册下一轮发布。
下一个问题是在哪里放置初始注册。初始注册可以在通道打开回调中完成。
下面是一个代码片段,以便更好地理解。请注意,它不是生产就绪的。因为它只以每秒 10 条的最大速度发布消息。您需要调整发布间隔并在一次回调中发布更多消息。
class MQ(Object):
def __init___(self, queue):
self.queue = queue
def on_channel_open(self, chn):
self.channel = chn
self.connection.add_timeout(0.1, self.schedule_next_message)
def schedule_next_message(self):
try:
msg = self.queue.get(True, 0.01)
self.channel.basic_publish('YOUR EXCHANGE','YOUR ROUTING KEY',msg)
except Queue.Empty:
pass
self.connection.add_timeout(0.1, self.schedule_next_message)
def on_open(self, conn):
self.connection = conn
self.connection.channel(on_open_callback=self.on_channel_open)
def run(self):
# create a connection
self.connection = pika.SelectConnection(pika.ConnectionParameters(heartbeat=600,host=args.mq_ip),self.on_open)
try:
self.connection.ioloop.start()
except Exception:
print("exception in publisher")
self.connection.close()
self.connection.ioloop.start()
Run Code Online (Sandbox Code Playgroud)
将 MQ(queue).run() 放在一个单独的线程中,每当你想把消息放入 mq 时,只需将它放入队列对象中即可。
ede*_*ora -4
作为第一种方法,我建议您从帖子末尾提供的发布/订阅示例开始。一旦您理解了这个简单的示例,就可以开始按照最后的代码块之前提供的教程进行操作。该教程有 6 个不同的用例及其 Python 示例。通过前 5 个步骤,您将了解其工作原理。您应该清楚交换器(将消息路由到每个队列的实体)、绑定密钥(用于连接交换器和队列的密钥)、路由密钥(与发布者和消息一起发送的密钥)的概念。交换器使用它来将消息路由到一个队列或另一个队列)和队列(可以存储消息的缓冲区,可以有超过 1 个(如果需要的话,可以有 1 个)订阅者,并且可以从超过 1 个交换器获取消息,并且基于不同的交换器绑定键)。此外,交换的类型不止一种(扇出、主题(这可能就是您需要的)......)。
如果这一切听起来很新鲜,请按照 RabbitMQ 提供的教程进行操作:
https://www.rabbitmq.com/tutorials/tutorial-one-python.html
pub.py:
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print " [x] Sent 'Hello World!'"
connection.close()
Run Code Online (Sandbox Code Playgroud)
子.py:
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
print ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
channel.basic_consume(callback,
queue='hello',
no_ack=True)
channel.start_consuming()
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
3807 次 |
最近记录: |