lai*_*e9m 9 python rabbitmq pika
我是rabbitmq和pika的新手,并且在停止消费方面遇到了麻烦.
通道和队列设置:
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue=new_task_id, durable=True, auto_delete=True)
Run Code Online (Sandbox Code Playgroud)
基本上,消费者和生产者是这样的:
消费者:
def task(task_id):
def callback(channel, method, properties, body):
if body != "quit":
print(body)
else:
print(body)
channel.stop_consuming(task_id)
channel.basic_consume(callback, queue=task_id, no_ack=True)
channel.start_consuming()
print("finish")
return "finish"
Run Code Online (Sandbox Code Playgroud)
制片人:
proc = Popen(['app/sample.sh'], shell=True, stdout=PIPE)
while proc.returncode is None: # running
line = proc.stdout.readline()
if line:
channel.basic_publish(
exchange='',
routing_key=self.request.id,
body=line
)
else:
channel.basic_publish(
exchange='',
routing_key=self.request.id,
body="quit"
)
break
Run Code Online (Sandbox Code Playgroud)
消费者task给了我输出:
# ... output from sample.sh, as expected
quit
?}q(UstatusqUSUCCESSqU tracebackqNUresultqNUtask_idqU
1419350416qUchildrenq]u.
Run Code Online (Sandbox Code Playgroud)
但是,"finish"没有打印,所以我猜它是因为channel.stop_consuming(task_id)没有停止消费.如果是这样,那么正确的方法是什么?谢谢.
我有同样的问题.它似乎是由内部start_consuming呼叫引起的self.connection.process_data_events(time_limit=None).这time_limit=None使它挂起.
我设法通过将调用替换channel.start_consuming()为其实现来解决此问题,黑客攻击:
while channel._consumer_infos:
channel.connection.process_data_events(time_limit=1) # 1 second
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
5712 次 |
| 最近记录: |