我们正在尝试建立一个基本的有向队列系统,其中生产者将生成多个任务,一个或多个消费者将一次获取任务,处理它并确认该消息.
问题是,处理可能需要10-20分钟,而我们当时没有响应消息,导致服务器断开连接.
这是我们的消费者的一些伪代码:
#!/usr/bin/env python
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body):
long_running_task(connection)
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='task_queue')
channel.start_consuming()
Run Code Online (Sandbox Code Playgroud)
第一个任务完成后,在BlockingConnection内部的某处抛出异常,抱怨套接字已重置.此外,RabbitMQ日志显示消费者因未及时响应而断开连接(为什么重置连接而不是发送FIN很奇怪,但我们不会担心这一点).
我们搜索了很多,因为我们认为这是RabbitMQ的正常使用案例(有许多长期运行的任务应该在许多消费者中分开),但似乎没有其他人真正有这个问题.最后,我们偶然发现了一个线程,建议使用心跳并long_running_task()在单独的线程中生成心跳.
所以代码变成了:
#!/usr/bin/env python
import pika
import time
import threading
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost',
heartbeat_interval=20))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
def thread_func(ch, method, body):
long_running_task(connection)
ch.basic_ack(delivery_tag …Run Code Online (Sandbox Code Playgroud) 似乎我让我的Rabbitmq服务器运行的时间越长,我对未确认消息的麻烦就越多.我很乐意将它们重新排列.实际上似乎有一个amqp命令来执行此操作,但它仅适用于您的连接使用的通道.我制作了一个小的鼠兔脚本,至少尝试一下,但是我要么缺少一些东西,要么就是这样做了(用rabbitmqctl怎么样?)
import pika
credentials = pika.PlainCredentials('***', '***')
parameters = pika.ConnectionParameters(host='localhost',port=5672,\
credentials=credentials, virtual_host='***')
def handle_delivery(body):
"""Called when we receive a message from RabbitMQ"""
print body
def on_connected(connection):
"""Called when we are fully connected to RabbitMQ"""
connection.channel(on_channel_open)
def on_channel_open(new_channel):
"""Called when our channel has opened"""
global channel
channel = new_channel
channel.basic_recover(callback=handle_delivery,requeue=True)
try:
connection = pika.SelectConnection(parameters=parameters,\
on_open_callback=on_connected)
# Loop so we can communicate with RabbitMQ
connection.ioloop.start()
except KeyboardInterrupt:
# Gracefully close the connection
connection.close()
# Loop until we're fully closed, will stop on …Run Code Online (Sandbox Code Playgroud) 使用Python,Pika和RabbitMQ创建延迟(或停放)队列的最简单方法是什么?我见过类似的问题,但Python没有.
在设计应用程序时,我发现这是一个有用的想法,因为它允许我们限制需要重新排队的消息.
总是有可能你会收到比你能处理的更多的消息,可能是HTTP服务器很慢,或者数据库压力太大.
我还发现,在对丢失消息零容忍的情况下出现问题时非常有用,而重新排队无法处理的消息可能会解决这个问题.它也可能导致消息一次又一次排队的问题.可能导致性能问题,并记录垃圾邮件.
我正在尝试创建一个订阅多个队列的消费者,然后在消息到达时处理它们.
问题是当第一个队列中已经存在某些数据时,它会占用第一个队列,而不会消耗第二个队列.但是,当第一个队列为空时,它会转到下一个队列,然后同时消耗这两个队列.
我首先实现了线程,但是想要避开它,当pika库为我做的时候没有太多的复杂性.以下是我的代码:
import pika
mq_connection = pika.BlockingConnection(pika.ConnectionParameters('x.x.x.x'))
mq_channel = mq_connection.channel()
mq_channel.basic_qos(prefetch_count=1)
def callback(ch, method, properties, body):
print body
mq_channel.basic_ack(delivery_tag=method.delivery_tag)
mq_channel.basic_consume(callback, queue='queue1', consumer_tag="ctag1.0")
mq_channel.basic_consume(callback, queue='queue2', consumer_tag="ctag2.0")
mq_channel.start_consuming()
Run Code Online (Sandbox Code Playgroud) 我一直试图找出使用鼠兔时应该使用哪种形式的连接,据我所知,我有两种选择.
无论是BlockingConnection或SelectConnection,但是我真的不知道有关之间的差异这两个(即什么是BlockingConnection阻止?多)
pika说的文件SelectConnection是连接到兔子的首选方式,因为它提供了"多种事件通知方法,包括select,epoll,kqueue和poll".
所以我想知道这两种不同类型的连接有什么含义?
PS:我知道我不应该在标题中添加标签,但在这种情况下,我认为它确实有助于澄清问题.
我想在几个线程中使用进程消息但是在执行此代码时我遇到错误:
from __future__ import with_statement
import pika
import sys
from pika.adapters.blocking_connection import BlockingConnection
from pika import connection, credentials
import time
import threading
import random
from pika.adapters.select_connection import SelectConnection
from pika.connection import Connection
import traceback
def doWork(body, args, channel):
r = random.random()
time.sleep(r * 10)
try:
channel.basic_ack(delivery_tag=args.delivery_tag)
except :
traceback.print_exc()
auth = credentials.PlainCredentials(username="guest", password="guest")
params = connection.ConnectionParameters(host="localhost", credentials=auth)
conn = BlockingConnection(params)
channel = conn.channel()
while True:
time.sleep(0.03)
try:
method_frame, header_frame, body = channel.basic_get(queue="test_queue")
if method_frame.NAME == 'Basic.GetEmpty':
continue
t = threading.Thread(target=doWork, …Run Code Online (Sandbox Code Playgroud) 我一直致力于通过RabbitMQ来完成一些分布式任务.
我花了一些时间试图让Celery做我想做的事情,但却无法让它发挥作用.
然后我尝试使用Pika和刚刚工作的东西,完美无缺,几分钟之内.
使用Pika而不是芹菜,有什么我错过的吗?
我有一个python worker客户端,它会旋转10个worker,每个worker挂钩到RabbitMQ队列.有点像这样:
#!/usr/bin/python
worker_count=10
def mqworker(queue, configurer):
connection = pika.BlockingConnection(pika.ConnectionParameters(host='mqhost'))
channel = connection.channel()
channel.queue_declare(queue=qname, durable=True)
channel.basic_consume(callback,queue=qname,no_ack=False)
channel.basic_qos(prefetch_count=1)
channel.start_consuming()
def callback(ch, method, properties, body):
doSomeWork();
ch.basic_ack(delivery_tag = method.delivery_tag)
if __name__ == '__main__':
for i in range(worker_count):
worker = multiprocessing.Process(target=mqworker)
worker.start()
Run Code Online (Sandbox Code Playgroud)
我遇到的问题是,尽管在通道上设置了basic_qos,但是第一个启动的工作人员接受队列中的所有消息,而其他人则闲置在那里.我可以在rabbitmq界面中看到这一点,即使我设置worker_count为1并在队列中转储50条消息,所有50条消息进入"未确认"桶,而我预计1会成为未确认的,而另外49条将成为准备.
为什么这不起作用?
我正在使用Pika处理来自RabbitMQ的数据.由于我似乎遇到了不同类型的问题,我决定编写一个小型测试应用程序来查看如何处理断开连接.
我写了这个测试应用程序,其中包括:
我注意到的是2个问题:
这可能是因为网络问题,数据包丢失,但我发现连接不是很强大.
当脚本在RabbitMQ服务器上本地运行并且我杀死RabbitMQ时,脚本退出时出现错误:"ERROR pika SelectConnection:3:104上的套接字错误"
所以看起来我不能让重新连接策略按原样运行.有人可以查看代码,看看我做错了什么?
谢谢,
松鸦
#!/bin/python
import logging
import threading
import Queue
import pika
from pika.reconnection_strategies import SimpleReconnectionStrategy
from pika.adapters import SelectConnection
import time
from threading import Lock
class Broker(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.logging = logging.getLogger(__name__)
self.to_broker = Queue.Queue(0)
self.from_broker = Queue.Queue(0)
self.parameters = pika.ConnectionParameters(host='sandbox',heartbeat=True)
self.srs = SimpleReconnectionStrategy()
self.properties = pika.BasicProperties(delivery_mode=2)
self.connection = None
while True:
try:
self.connection = SelectConnection(self.parameters, self.on_connected, reconnection_strategy=self.srs)
break
except Exception as err:
self.logging.warning('Cant …Run Code Online (Sandbox Code Playgroud) 类似的问题似乎都是基于使用自定义记录器,我很高兴只使用默认/无.我的pika python应用程序运行并接收消息,但几秒钟后崩溃No handlers could be found for logger "pika.adapters.blocking_connection",任何想法?
import pika
credentials = pika.PlainCredentials('xxx_apphb.com', 'xxx')
parameters = pika.ConnectionParameters('bunny.cloudamqp.com', 5672, 'xxx_apphb.com', credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare('messages')
def message_received(channel, method, properties, body):
print "[x] Received %r" % (body)
channel.basic_consume(message_received, queue='messages', no_ack=True)
channel.start_consuming()
Run Code Online (Sandbox Code Playgroud)
通过添加修复:
import logging
logging.basicConfig()
Run Code Online (Sandbox Code Playgroud)