标签: pika

在pika/RabbitMQ中处理长时间运行的任务

我们正在尝试建立一个基本的有向队列系统,其中生产者将生成多个任务,一个或多个消费者将一次获取任务,处理它并确认该消息.

问题是,处理可能需要10-20分钟,而我们当时没有响应消息,导致服务器断开连接.

这是我们的消费者的一些伪代码:

#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    long_running_task(connection)
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()
Run Code Online (Sandbox Code Playgroud)

第一个任务完成后,在BlockingConnection内部的某处抛出异常,抱怨套接字已重置.此外,RabbitMQ日志显示消费者因未及时响应而断开连接(为什么重置连接而不是发送FIN很奇怪,但我们不会担心这一点).

我们搜索了很多,因为我们认为这是RabbitMQ的正常使用案例(有许多长期运行的任务应该在许多消费者中分开),但似乎没有其他人真正有这个问题.最后,我们偶然发现了一个线程,建议使用心跳并long_running_task()在单独的线程中生成心跳.

所以代码变成了:

#!/usr/bin/env python
import pika
import time
import threading

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost',
        heartbeat_interval=20))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'

def thread_func(ch, method, body):
    long_running_task(connection)
    ch.basic_ack(delivery_tag …
Run Code Online (Sandbox Code Playgroud)

rabbitmq pika

49
推荐指数
5
解决办法
2万
查看次数

如何从其他渠道恢复未确认的AMQP消息,而不是我自己的连接?

似乎我让我的Rabbitmq服务器运行的时间越长,我对未确认消息的麻烦就越多.我很乐意将它们重新排列.实际上似乎有一个amqp命令来执行此操作,但它仅适用于您的连接使用的通道.我制作了一个小的鼠兔脚本,至少尝试一下,但是我要么缺少一些东西,要么就是这样做了(用rabbitmqctl怎么样?)

import pika

credentials = pika.PlainCredentials('***', '***')
parameters = pika.ConnectionParameters(host='localhost',port=5672,\
    credentials=credentials, virtual_host='***')

def handle_delivery(body):
    """Called when we receive a message from RabbitMQ"""
    print body

def on_connected(connection):
    """Called when we are fully connected to RabbitMQ"""
    connection.channel(on_channel_open)    

def on_channel_open(new_channel):
    """Called when our channel has opened"""
    global channel
    channel = new_channel
    channel.basic_recover(callback=handle_delivery,requeue=True)    

try:
    connection = pika.SelectConnection(parameters=parameters,\
        on_open_callback=on_connected)    

    # Loop so we can communicate with RabbitMQ
    connection.ioloop.start()
except KeyboardInterrupt:
    # Gracefully close the connection
    connection.close()
    # Loop until we're fully closed, will stop on …
Run Code Online (Sandbox Code Playgroud)

amqp rabbitmq celery celeryd pika

46
推荐指数
3
解决办法
5万
查看次数

如何在RabbitMQ中创建延迟队列?

使用Python,Pika和RabbitMQ创建延迟(或停放)队列的最简单方法是什么?我见过类似的问题,但Python没有.

在设计应用程序时,我发现这是一个有用的想法,因为它允许我们限制需要重新排队的消息.

总是有可能你会收到比你能处理的更多的消息,可能是HTTP服务器很慢,或者数据库压力太大.

我还发现,在对丢失消息零容忍的情况下出现问题时非常有用,而重新排队无法处理的消息可能会解决这个问题.它也可能导致消息一次又一次排队的问题.可能导致性能问题,并记录垃圾邮件.

python queue delay rabbitmq pika

44
推荐指数
3
解决办法
2万
查看次数

在python/pika中使用多个队列

我正在尝试创建一个订阅多个队列的消费者,然后在消息到达时处理它们.

问题是当第一个队列中已经存在某些数据时,它会占用第一个队列,而不会消耗第二个队列.但是,当第一个队列为空时,它会转到下一个队列,然后同时消耗这两个队列.

我首先实现了线程,但是想要避开它,当pika库为我做的时候没有太多的复杂性.以下是我的代码:

import pika

mq_connection = pika.BlockingConnection(pika.ConnectionParameters('x.x.x.x'))
mq_channel = mq_connection.channel()
mq_channel.basic_qos(prefetch_count=1)


def callback(ch, method, properties, body):
    print body
    mq_channel.basic_ack(delivery_tag=method.delivery_tag)

mq_channel.basic_consume(callback, queue='queue1', consumer_tag="ctag1.0")
mq_channel.basic_consume(callback, queue='queue2', consumer_tag="ctag2.0")
mq_channel.start_consuming()
Run Code Online (Sandbox Code Playgroud)

python rabbitmq pika

31
推荐指数
2
解决办法
8350
查看次数

与鼠兔使用哪种形式的连接

我一直试图找出使用鼠兔时应该使用哪种形式的连接,据我所知,我有两种选择.

无论是BlockingConnectionSelectConnection,但是我真的不知道有关之间的差异这两个(即什么是BlockingConnection阻止?多)

pika说的文件SelectConnection是连接到兔子的首选方式,因为它提供了"多种事件通知方法,包括select,epoll,kqueue和poll".

所以我想知道这两种不同类型的连接有什么含义?

PS:我知道我不应该在标题中添加标签,但在这种情况下,我认为它确实有助于澄清问题.

python rabbitmq pika

28
推荐指数
2
解决办法
5897
查看次数

当我使用pika(python)向RabbitMQ尝试ack消息时出现错误"未知传递标记"

我想在几个线程中使用进程消息但是在执行此代码时我遇到错误:

from __future__ import with_statement
import pika
import sys
from pika.adapters.blocking_connection import BlockingConnection
from pika import connection, credentials
import time
import threading
import random
from pika.adapters.select_connection import SelectConnection
from pika.connection import Connection
import traceback


def doWork(body, args, channel):


    r = random.random()
    time.sleep(r * 10)
    try:        
        channel.basic_ack(delivery_tag=args.delivery_tag)

    except :
        traceback.print_exc()


auth = credentials.PlainCredentials(username="guest", password="guest")
params = connection.ConnectionParameters(host="localhost", credentials=auth)
conn = BlockingConnection(params)
channel = conn.channel()


while True:

    time.sleep(0.03)    
    try:

        method_frame, header_frame, body = channel.basic_get(queue="test_queue")
        if method_frame.NAME == 'Basic.GetEmpty':
            continue        

        t = threading.Thread(target=doWork, …
Run Code Online (Sandbox Code Playgroud)

python message-queue rabbitmq pika

20
推荐指数
3
解决办法
3万
查看次数

RabbitMQ:芹菜提供什么Pika没有?

我一直致力于通过RabbitMQ来完成一些分布式任务.

我花了一些时间试图让Celery做我想做的事情,但却无法让它发挥作用.

然后我尝试使用Pika和刚刚工作的东西,完美无缺,几分钟之内.

使用Pika而不是芹菜,有什么我错过的吗?

python rabbitmq task-queue celery pika

17
推荐指数
2
解决办法
8058
查看次数

Pika + RabbitMQ:将basic_qos设置为prefetch = 1仍然会消耗队列中的所有消息

我有一个python worker客户端,它会旋转10个worker,每个worker挂钩到RabbitMQ队列.有点像这样:

#!/usr/bin/python
worker_count=10

def mqworker(queue, configurer):
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='mqhost'))
    channel = connection.channel()
    channel.queue_declare(queue=qname, durable=True)
    channel.basic_consume(callback,queue=qname,no_ack=False)
    channel.basic_qos(prefetch_count=1)
    channel.start_consuming()


def callback(ch, method, properties, body):
    doSomeWork();
    ch.basic_ack(delivery_tag = method.delivery_tag)

if __name__ == '__main__':
    for i in range(worker_count):
        worker = multiprocessing.Process(target=mqworker)
        worker.start()
Run Code Online (Sandbox Code Playgroud)

我遇到的问题是,尽管在通道上设置了basic_qos,但是第一个启动的工作人员接受队列中的所有消息,而其他人则闲置在那里.我可以在rabbitmq界面中看到这一点,即使我设置worker_count为1并在队列中转储50条消息,所有50条消息进入"未确认"桶,而我预计1会成为未确认的,而另外49条将成为准备.

为什么这不起作用?

qos rabbitmq pika

16
推荐指数
1
解决办法
7879
查看次数

RabbitMQ,Pika和重新连接策略

我正在使用Pika处理来自RabbitMQ的数据.由于我似乎遇到了不同类型的问题,我决定编写一个小型测试应用程序来查看如何处理断开连接.

我写了这个测试应用程序,其中包括:

  1. 连接到Broker,重试直到成功
  2. 连接时创建队列.
  3. 使用此队列并将结果放入python Queue.Queue(0)
  4. 从Queue.Queue(0)获取项目并将其生成回代理队列.

我注意到的是2个问题:

  1. 当我从一台连接到另一台主机上的rabbitmq的主机(在vm内)运行我的脚本时,这些脚本随机退出而不会产生错误.
  2. 当我在安装RabbitMQ的同一主机上运行我的脚本时,它运行正常并继续运行.

这可能是因为网络问题,数据包丢失,但我发现连接不是很强大.

当脚本在RabbitMQ服务器上本地运行并且我杀死RabbitMQ时,脚本退出时出现错误:"ERROR pika SelectConnection:3:104上的套接字错误"

所以看起来我不能让重新连接策略按原样运行.有人可以查看代码,看看我做错了什么?

谢谢,

松鸦

#!/bin/python
import logging
import threading
import Queue
import pika
from pika.reconnection_strategies import SimpleReconnectionStrategy
from pika.adapters import SelectConnection
import time
from threading import Lock

class Broker(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.logging = logging.getLogger(__name__)
        self.to_broker = Queue.Queue(0)
        self.from_broker = Queue.Queue(0)
        self.parameters = pika.ConnectionParameters(host='sandbox',heartbeat=True)
        self.srs = SimpleReconnectionStrategy()
        self.properties = pika.BasicProperties(delivery_mode=2)

        self.connection = None
        while True:
            try:
                self.connection = SelectConnection(self.parameters, self.on_connected,  reconnection_strategy=self.srs)
                break
            except Exception as err:
                self.logging.warning('Cant …
Run Code Online (Sandbox Code Playgroud)

python rabbitmq pika

15
推荐指数
1
解决办法
2万
查看次数

无法找到记录器"pika.adapters.blocking_connection"的处理程序

类似的问题似乎都是基于使用自定义记录器,我很高兴只使用默认/无.我的pika python应用程序运行并接收消息,但几秒钟后崩溃No handlers could be found for logger "pika.adapters.blocking_connection",任何想法?

import pika

credentials = pika.PlainCredentials('xxx_apphb.com', 'xxx')
parameters = pika.ConnectionParameters('bunny.cloudamqp.com', 5672, 'xxx_apphb.com', credentials)

connection = pika.BlockingConnection(parameters)
channel = connection.channel()

channel.queue_declare('messages')

def message_received(channel, method, properties, body):
    print "[x] Received %r" % (body)

channel.basic_consume(message_received, queue='messages', no_ack=True)

channel.start_consuming()
Run Code Online (Sandbox Code Playgroud)

通过添加修复:

import logging
logging.basicConfig()
Run Code Online (Sandbox Code Playgroud)

python amqp pika

15
推荐指数
1
解决办法
7725
查看次数

标签 统计

pika ×10

rabbitmq ×9

python ×7

amqp ×2

celery ×2

celeryd ×1

delay ×1

message-queue ×1

qos ×1

queue ×1

task-queue ×1