小编use*_*878的帖子

在python/pika中使用多个队列

我正在尝试创建一个订阅多个队列的消费者,然后在消息到达时处理它们.

问题是当第一个队列中已经存在某些数据时,它会占用第一个队列,而不会消耗第二个队列.但是,当第一个队列为空时,它会转到下一个队列,然后同时消耗这两个队列.

我首先实现了线程,但是想要避开它,当pika库为我做的时候没有太多的复杂性.以下是我的代码:

import pika

mq_connection = pika.BlockingConnection(pika.ConnectionParameters('x.x.x.x'))
mq_channel = mq_connection.channel()
mq_channel.basic_qos(prefetch_count=1)


def callback(ch, method, properties, body):
    print body
    mq_channel.basic_ack(delivery_tag=method.delivery_tag)

mq_channel.basic_consume(callback, queue='queue1', consumer_tag="ctag1.0")
mq_channel.basic_consume(callback, queue='queue2', consumer_tag="ctag2.0")
mq_channel.start_consuming()
Run Code Online (Sandbox Code Playgroud)

python rabbitmq pika

31
推荐指数
2
解决办法
8350
查看次数

信号处理pika/python

pika.BlockingConnection在消费者中使用,为每条消息执行一些任务.我还添加了信号处理功能,以便消费者在完成所有任务后正常死亡.

在处理消息并收到信号时,我只是"signal received"从函数中获取,但代码不会退出.所以,我决定检查在回调函数结束时收到的信号.问题是,我检查信号的次数是多少,因为此代码中还有更多的功能.是否有更好的处理信号的方法而不会过度使用东西?

import signal
import sys
import pika
from time import sleep

received_signal = False
all_over = False

def signal_handler(signal, frame):
    global received_signal
    print "signal received"
    received_signal = True

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

mq_connection = pika.BlockingConnection(pika.ConnectionParameters(my_mq_server, virtual_host='test'))
mq_channel = mq_connection.channel()

def callback(ch, method, properties, body):
    if received_signal:
        print "Exiting, as a kill signal is already received"
        exit(0)
    print body
    sleep(50)
    mq_channel.basic_ack(delivery_tag=method.delivery_tag)
    print "Message consumption complete"

    if received_signal:
        print "Exiting, as a kill signal is already …
Run Code Online (Sandbox Code Playgroud)

python signals signal-handling pika

6
推荐指数
1
解决办法
1378
查看次数

标签 统计

pika ×2

python ×2

rabbitmq ×1

signal-handling ×1

signals ×1