Hug*_*usa 12 python queue parallel-processing asynchronous rabbitmq
我在Python中使用RabbitMQ来管理生产者和多个消费者之间的几个队列.在RabbitMQ网站(路由模型)的示例中,消费者被阻止.这意味着它们会在start_consuming()上停止并在每次队列中有新的"任务"时执行回调函数.
我的问题是:如何以他仍在等待任务的方式实现我的消费者(因此,每次队列中有新的东西时都会调用回调函数)但同时他可以执行其他工作/代码.
谢谢
表格常见问题:
Pika 在代码中没有任何线程的概念。如果您想将 Pika 与线程一起使用,请确保每个线程都有一个 Pika 连接,并在该线程中创建。跨线程共享一个 Pika 连接是不安全的,
所以让我们在线程内部创建连接:
import pika
class PikaMassenger():
exchange_name = '...'
def __init__(self, *args, **kwargs):
self.conn = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
self.channel = self.conn.channel()
self.channel.exchange_declare(
exchange=self.exchange_name,
exchange_type='topic')
def consume(self, keys, callback):
result = self.channel.queue_declare('', exclusive=True)
queue_name = result.method.queue
for key in keys:
self.channel.queue_bind(
exchange=self.exchange_name,
queue=queue_name,
routing_key=key)
self.channel.basic_consume(
queue=queue_name,
on_message_callback=callback,
auto_ack=True)
self.channel.start_consuming()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.conn.close()
def start_consumer():
def callback(ch, method, properties, body):
print(" [x] %r:%r consumed" % (method.routing_key, body))
with PikaMassenger() as consumer:
consumer.consume(keys=[...], callback=callback)
consumer_thread = threading.Thread(target=start_consumer)
consumer_thread.start()
Run Code Online (Sandbox Code Playgroud)
小智 -4
对于接收器
import pika
messages = []
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='message')
def callback(ch, method, properties, message):
print(message)
messages.append(message)
channel.basic_consume(callback,queue='message',no_ack=True)
Run Code Online (Sandbox Code Playgroud)
和
channel.basic_consume(callback,queue='message',no_ack=True)
Run Code Online (Sandbox Code Playgroud)
当你需要时) 或在线程中
import threading
import pika
import time
messages = []
def recieve_messages():
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
messages.append(body)
channel.basic_consume(callback,
queue='hello',
no_ack=True)
# channel.start_consuming()
mq_recieve_thread = threading.Thread(target=channel.start_consuming)
mq_recieve_thread.start()
recieve_messages()
while True:
print(messages)
time.sleep(1)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
6791 次 |
| 最近记录: |