Chu*_*tin 5 python rabbitmq apache-kafka
我有来自不同应用程序的多个生产者向 RabbitMQ 中的主题发送消息。来自不同应用程序的多个消费者正在阅读这些主题。这个简单的架构作为 PoC 已经完美运行。但现在我有来自这些应用程序的多个实例,并且我不希望应用程序 X 实例 1 读取与应用程序 X 实例 2 相同的消息。但是应用程序 X 和应用程序 Y(及其所有实例)需要从同一主题变为红色。
我知道如果消费者共享相同的消费者 ID,Karaf 会平衡主题消息的消耗。RabbitMQ中有这个功能吗?我一直在阅读文档,但没有找到这样的内容。
我相信你需要kafka的消费者组功能。
(对于每一条消息,不同的消费者组应该一起消费它,但每个消费者组中只有一个消费者可以消费这条消息)
参见rabbitmq getstarted,可以结合Topics
mode和Work queues
mode来实现这个功能。
示例代码
接收.py
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
queue_name = sys.argv[1]
channel.queue_declare(queue=queue_name)
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key='my_key')
print ' [*] Waiting for logs. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] %r:%r" % (method.routing_key, body,)
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue=queue_name)
channel.start_consuming()
Run Code Online (Sandbox Code Playgroud)
发送.py
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
routing_key = 'my_key'
message = 'Hello World!'
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
print " [x] Sent %r:%r" % (routing_key, message)
connection.close()
Run Code Online (Sandbox Code Playgroud)
如何运行它?
你需要启动4个消费者来解释这个问题:
python receive.py consumer_group1
python receive.py consumer_group1
python receive.py consumer_group2
python receive.py consumer_group2
Run Code Online (Sandbox Code Playgroud)
以上交叉响应 app1(instance1)、app1(instance2)、app2(instance1)、app2(instance2)
然后,启动send.py:
python send.py
Run Code Online (Sandbox Code Playgroud)
您只会看到可以获取该消息的每个应用程序的一个实例。如果您再次发送,来自两个不同应用程序的另一个实例可以接收该消息。
归档时间: |
|
查看次数: |
4634 次 |
最近记录: |