gia*_*tas 3 python rabbitmq pika
我设置了一个 RabbitMQ 消费者,如下所示:
from collections import OrderedDict
from concurrent.futures import ThreadPoolExecutor
import pika
import datetime
import logging
import json
from logging import StreamHandler
from time import sleep
from random import randint
from pika import SelectConnection
logging.basicConfig(handlers=[StreamHandler()],
level=logging.INFO,
format=logging.BASIC_FORMAT)
_logger = logging.getLogger(__name__)
class QueueConsumer(object):
"""The consumer class to manage connections to the AMQP server/queue"""
def __init__(self, queue, logger, parameters, thread_id=0):
self.channel = None
self.connection = None
self.queue_name = queue
self.logger = logger
self.consumer_id = 'Thread: %d' % (thread_id,)
self.parameters = pika.ConnectionParameters(**parameters)
def _on_queue_declared(self, frame):
self.logger.debug('{} ... declaring queue'.format(self.consumer_id))
self.channel.basic_qos(prefetch_count=1)
try:
self.channel.basic_consume(self.handle_delivery, queue=self.queue_name, no_ack=True)
self.logger.info("{} Declared queue...".format(self.consumer_id))
except Exception as e:
self.logger.error('{} crashing:--> {}'.format(self.consumer_id, str(e)))
def _on_channel_open(self, channel):
self.channel = channel
try:
self.channel.queue_declare(queue=self.queue_name,
exclusive=False,
durable=True,
auto_delete=False,
callback=self._on_queue_declared)
self.logger.info("{} Opened Channel....".format(self.consumer_id))
except Exception as e:
self.logger.error('{} {}'.format(self.consumer_id, str(e)))
def _on_connected(self, connection):
connection.channel(self._on_channel_open)
def consume(self):
try:
self.connection = SelectConnection(self.parameters,
self._on_connected)
self.connection.ioloop.start()
except Exception as e:
self.logger.error('{} {}'.format(self.consumer_id, str(e)))
self.connection.close()
self.connection.ioloop.start()
def decode(self, body):
try:
_body = body.decode('utf-8')
except AttributeError:
_body = body
return _body
def handle_delivery(self, channel, method, header, body):
try:
start_time = datetime.datetime.now()
_logger.info("Received...")
_logger.info("Content: %s" % body)
req = json.loads(self.decode(body))
# Do something
sleep(randint(10, 100))
time_taken = datetime.datetime.now() - start_time
_logger.info("[{}] Time Taken: {}.{}".format(
req.get("to_num"), time_taken.seconds, time_taken.microseconds))
except Exception as err:
_logger.exception(err)
if __name__ == "__main__":
workers = 3
pika_parameters = OrderedDict([('host', '127.0.0.1'), ('port', 5672), ('virtual_host', '/')])
try:
pool = ThreadPoolExecutor(max_workers=workers)
start = 1
for thread_id in range(start, (workers + start)):
pool.submit(QueueConsumer('test_queue', _logger, pika_parameters, thread_id).consume)
except Exception as err:
_logger.exception(err)
Run Code Online (Sandbox Code Playgroud)
我也有一个队列发布者,如下所示:
import uuid
import pika
import logging
import json
from logging import StreamHandler
from pika import SelectConnection
logging.basicConfig(handlers=[StreamHandler()],
level=logging.DEBUG,
format=logging.BASIC_FORMAT)
_logger = logging.getLogger(__name__)
class QueuePublisherClient(object):
def __init__(self, queue, request):
self.queue = queue
self.response = None
self.channel = None
self.request = request
self.corrId = str(uuid.uuid4())
self.callBackQueue = None
self.connection = None
parameters = pika.ConnectionParameters(host="0.0.0.0")
self.connection = SelectConnection(
parameters, self.on_response_connected
)
self.connection.ioloop.start()
def on_response(self, ch, method, props, body):
if self.corrId == props.correlation_id:
self.response = body
self.connection.close()
self.connection.ioloop.start()
def on_response_connected(self, connection):
_logger.info("Connected...\t(%s)" % self.queue)
self.connection = connection
self.connection.channel(self.on_channel_open)
def on_connected(self, connection):
self.connection = connection
self.connection.channel(self.on_channel_open)
def on_channel_open(self, channel):
# _logger.info("Channel Opened...\t(%s)" % self.queue)
self.channel = channel
self.channel.queue_declare(queue=self.queue,
durable=True,
exclusive=False,
auto_delete=False,
callback=self.on_queue_declared)
def on_queue_declared(self, frame):
self.channel.basic_publish(exchange="",
routing_key=self.queue,
properties=pika.BasicProperties(),
body=str(self.request))
self.connection.close()
_logger.info("Message Published...\t(%s)" % self.queue)
if __name__ == "__main__":
data = {
'text': 'This is a sample text',
'to_num': '+2547xxxxxxxx'
}
count = 10000
for index in range(count):
data['index'] = index
QueuePublisherClient("test_queue", json.dumps(data))
Run Code Online (Sandbox Code Playgroud)
当我向队列发布 10000 条消息并且消费者未启动时,通过rabbitmqctl list_queues我可以看到 test_queue 有 10000 条消息。当我启动消费者时,我运行rabbitmqctl list_queues并看到队列有 0 条消息。然而,消费者仍在消费队列中的消息。问题是,当我在几秒钟后停止消费者然后重新启动它时,我无法恢复我的消息。我怎样才能避免这个?
这只是对实际情况的模拟,消费者进程被 monit 重新启动,我遭受了消息丢失。
首先,您应该使用最新版本的 Pika。
当您设置时no_ack=True(auto_ack=True在 Pika 1.0 中),RabbitMQ 认为消息在传递时已确认。这意味着当您停止时,消费者在内存(或 TCP 堆栈)中拥有的每条消息都将丢失,因为 RabbitMQ 认为它已被确认。
工作完成后,您应该使用no_ack=False(默认)并确认消息。handle_delivery请注意,如果您的工作需要很长时间,您应该在另一个线程中进行,以防止阻塞 Pika 的 I/O 循环。
请参阅以下文档: https: //www.rabbitmq.com/confirms.html
| 归档时间: |
|
| 查看次数: |
5485 次 |
| 最近记录: |