我想直接从脚本发送一条消息,然后对其进行处理,然后将结果发送回去。因此,这就像一个双重发布-订阅。
我有2个脚本:
客户端直接向处理程序发送一条消息(简单字符串),然后,Processer脚本对字符串中的字符进行计数,然后将结果发送回客户端。
这是我尝试做的事情:
处理程序等待消息,计算出一些内容,然后将答案返回给原始发件人。
#Processer.py:
import pika
import sys
#Sends back the score
#addr: Connection address
#exchName: Exchange name (where to send)
#rKey: Name of the queue for direct messages
#score: The detected score
def SendActualScore(addr, exchName, rKey, score):
#Send the image thru the created channel with the given routing key (queue name)
channel.basic_publish(exchange=exchName, routing_key=rKey, body=score)
print "(*) Sent: " + score
#When we receive something this is called
def CallbackImg(ch, method, properties, body):
print "(*) Received: " + str(body)
score = str(len(body))
#Send back the score
SendActualScore('localhost', 'valami', rKey, score)
#Subscribe connection
#Receive messages thru this
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
#RECEIVE MESSAGES - Subscribe
channel.exchange_declare(exchange='valami', type='direct')
#Define a queue, where we don't need the name
#After we disconnected delete the queue (exclusive flag)
result = channel.queue_declare(exclusive=True)
#We need the name of our temporary queue
queue_name = result.method.queue
rKeys = sys.argv[1:]
for rKey in rKeys:
channel.queue_bind(exchange='valami', queue=queue_name, routing_key = rKey)
channel.basic_consume(CallbackImg, queue=queue_name, no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
Run Code Online (Sandbox Code Playgroud)
客户端只发送消息,然后等待答案。
#Client.py:
import pika
import sys
connAddr = 'localhost'
#Establish connection
connection = pika.BlockingConnection(pika.ConnectionParameters(connAddr))
channel = connection.channel()
#Define an exchange channel, we don't need a queue
channel.exchange_declare(exchange='valami', type='direct')
#Send the image thru the created channel
channel.basic_publish(exchange='valami', routing_key='msg', body='Message in the body')
print "[*] Sent"
def Callback(ch, method, properties, body):
print "(*) Received: " + str(body)
result = channel.queue_declare(exclusive=True)
#We need the name of our temporary queue
queue_name = result.method.queue
channel.queue_bind(exchange='valami', queue=queue_name)
channel.basic_consume(Callback, queue=queue_name, no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
Run Code Online (Sandbox Code Playgroud)
可能有多个客户端,但我不知道如何直接将消息发送回给他们。
您检查过 RabbitMQ w/ python 和 pika 中的 RPC 教程吗?http://www.rabbitmq.com/tutorials/tutorial-6-python.html
您需要在客户端执行的操作要点可在 RPC 教程中找到,但需要进行一些修改。
在您的客户端中,您将需要创建一个独占队列 - 与您在服务器中所做的方式相同。
当您从客户端发送消息时,您需要将 设为reply_to客户端独占队列的名称
来自教程:
channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to = callback_queue,
),
body=request)
Run Code Online (Sandbox Code Playgroud)
在服务器上,当您收到消息时,您需要reply_to从消息中读取标头,然后basic_publish读取对该队列的回复。
与其考虑“客户端”和“服务器”,不如用“消息生产者”和“消息消费者”来构建它可能会有所帮助。
在您的场景中,您需要两个进程既是发布者又是消费者。“客户端”将发布原始消息并使用响应。“服务器”将使用原始消息并发布响应。
reply_to代码中唯一真正的区别是原始消息中标头的使用。这是您应将响应发布到的队列的名称。
希望有帮助!
PS 我在我的RabbitMQ 模式电子书中介绍了这一点的核心概述- RPC 和请求/回复,就像您需要的那样。这本书讲的是原理和模式,而不是具体的编程语言(尽管我主要写node.js,并不真正了解python)。
| 归档时间: |
|
| 查看次数: |
2546 次 |
| 最近记录: |