标签: rabbitmq

RabbitMQ使用立即和强制位

我已经使用RabbitMQ服务器并在立即字段设置为true时发布消息,我尝试发送50,000条消息并使用rabbitmqctl list_queues,我看到队列中的消息数量为零.然后我将立即标志更改为false并再次尝试发送50,000条消息然后使用rabbitmqctl list_queues我看到总共100,000条消息在队列中.(直到现在还没有消费者存在)

之后我开始使用消费者并且它消耗了所有100,000条消息.任何人都可以帮助我理解立即位字段和这种行为.另外,我无法理解强制位字段的概念.

提前感谢.

Gurpreet Singh.

amqp rabbitmq

50
推荐指数
2
解决办法
2万
查看次数

面向服务的体系结构 - AMQP或HTTP

一点背景.

非常大的单片Django应用程序.所有组件都使用相同的数据库.我们需要分离服务,以便我们可以独立升级系统的某些部分而不影响其余部分.

我们使用RabbitMQ作为Celery的经纪人.

现在我们有两个选择:

  1. 使用REST接口的HTTP服务.
  2. JSONRPC over AMQP到事件循环服务

我的团队倾向于HTTP,因为这是他们熟悉的,但我认为使用RPC而不是AMQP的优势远大于它.

AMQP为我们提供了轻松添加负载平衡和高可用性以及保证消息传递的功能.

而使用HTTP我们必须创建客户端HTTP包装器以使用REST接口,我们必须放入负载平衡器并设置该基础结构以便具有HA等.

使用AMQP,我可以生成另一个服务实例,它将连接到与其他实例相同的队列以及bam,HA和负载平衡.

我对AMQP的看法是否遗漏了什么?

soa scaling http amqp rabbitmq

50
推荐指数
2
解决办法
2万
查看次数

如何在RabbitMQ中设置多次重试?

我正在使用RabbitMQ,我有一个包含电子邮件的队列.我的消费者服务将消息排队并尝试发送消息.如果由于任何原因,我的消费者无法发送消息,我想重新排队消息再次发送.我意识到我可以做一个basicNack并将requeue标志设置为true,但是,我不想无限期地重新排列该消息(例如,如果我们的电子邮件系统出现故障,我不想连续重新排队未发送的消息).我想定义有限次数,我可以重新排列要再次发送的消息.但是,当我将它出列并发送一个nack时,我无法在电子邮件消息对象上设置字段.更新的字段不存在于队列中的消息上.还有其他方法可以解决这个问题吗?提前致谢.

.net queue rabbitmq consumer

50
推荐指数
3
解决办法
3万
查看次数

在pika/RabbitMQ中处理长时间运行的任务

我们正在尝试建立一个基本的有向队列系统,其中生产者将生成多个任务,一个或多个消费者将一次获取任务,处理它并确认该消息.

问题是,处理可能需要10-20分钟,而我们当时没有响应消息,导致服务器断开连接.

这是我们的消费者的一些伪代码:

#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    long_running_task(connection)
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()
Run Code Online (Sandbox Code Playgroud)

第一个任务完成后,在BlockingConnection内部的某处抛出异常,抱怨套接字已重置.此外,RabbitMQ日志显示消费者因未及时响应而断开连接(为什么重置连接而不是发送FIN很奇怪,但我们不会担心这一点).

我们搜索了很多,因为我们认为这是RabbitMQ的正常使用案例(有许多长期运行的任务应该在许多消费者中分开),但似乎没有其他人真正有这个问题.最后,我们偶然发现了一个线程,建议使用心跳并long_running_task()在单独的线程中生成心跳.

所以代码变成了:

#!/usr/bin/env python
import pika
import time
import threading

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost',
        heartbeat_interval=20))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'

def thread_func(ch, method, body):
    long_running_task(connection)
    ch.basic_ack(delivery_tag …
Run Code Online (Sandbox Code Playgroud)

rabbitmq pika

49
推荐指数
5
解决办法
2万
查看次数

RabbitMQ中的主题交换与直接交换

我们有一个应用程序将使用RabbitMQ并有几个不同的队列在层之间传递消息.

最初,我计划使用多个直接交换,每个消息类型一个,但看起来使用不同的路由键绑定与队列进行单个主题交换将实现相同的目的.

进行单次交换似乎也更容易维护,但我想知道是否有任何好处(如果有的话)以另一种方式做到这一点?

选项1,使用多个直接交换:

ExchangeA (type: direct)
-QueueA

ExchangeB (type: direct)
-QueueB

ExchangeC (type: direct)
-QueueC
Run Code Online (Sandbox Code Playgroud)

选项2,使用单一主题交换:

Exchange (type: topic)
-QueueA  (receives messages from exchange with routing key of "TypeA")
-QueueB  (receives messages from exchange with routing key of "TypeB")
-QueueC  (receives messages from exchange with routing key of "TypeC")
Run Code Online (Sandbox Code Playgroud)

message-queue rabbitmq rabbitmq-exchange

48
推荐指数
3
解决办法
3万
查看次数

RabbitMQ - 消息传递顺序

我需要为我的新项目选择一个新的队列代理.

这次我需要一个支持pub/sub的可伸缩队列,并且必须保持消息排序.

我读过亚历克西斯的评论:他写道:

"事实上,我们认为RabbitMQ比Kafka提供更强的订购"

我在rabbitmq docs中阅读了消息订购部分:

"消息可以使用AMQP方法返回队列,这些方法具有重新排队参数(basic.recover,basic.reject和basic.nack),或者由于在保留未确认的消息时关闭了通道...使用2.7.0及更高版本它仍然有可能为个人消费者,观察消息无序如果队列中有多个用户.这是因为谁可能重新排队消息的其他用户的行为.从队列中的消息总是在发布顺序举行的视角. "

如果我需要按订单处理消息,我只能使用带有独占队列的rabbitMQ给每个消费者吗?

RabbitMQ仍然被认为是有序消息排队的一个很好的解决方案吗?

queue message-queue rabbitmq

48
推荐指数
3
解决办法
4万
查看次数

RabbitMQ - RabbitMQ可以在一台服务器上处理多少个队列?

我想知道RabbitMQ在单个服务器上可以处理多少个最大队列?

它取决于RAM吗?它取决于erlang进程吗?

message-queue amqp rabbitmq

48
推荐指数
2
解决办法
4万
查看次数

RabbitMQ(beam.smp)和高CPU /内存负载问题

我有一个debian盒用芹菜和rabbitmq运行任务大约一年.最近我注意到任务没有被处理,所以我登录系统,发现芹菜无法连接到rabbitmq.我重新启动了rabbitmq-server,尽管芹菜不再抱怨它现在没有执行新的任务.奇怪的是,rabbitmq正在疯狂地吞噬cpu和内存资源.重新启动服务器无法解决问题.花了几个小时在网上寻找解决方案后无济于事我决定重建服务器.

我用Debian 7.5,rabbitmq 2.8.4,芹菜3.1.13(Cipater)重建了新的服务器.大约一个小时左右,一切都工作得很好,直到芹菜开始再次抱怨它无法连接到rabbitmq!

[2014-08-06 05:17:21,036: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: [Errno 111] Connection refused.
Trying again in 6.00 seconds...
Run Code Online (Sandbox Code Playgroud)

我重新启动了rabbitmq service rabbitmq-server start和同样的问题:

rabbitmq开始再次膨胀,不断冲击cpu并慢慢接管所有ram并交换:

PID USER      PR  NI  VIRT  RES  SHR S  %CPU %MEM    TIME+  COMMAND
21823 rabbitmq  20   0  908m 488m 3900 S 731.2 49.4   9:44.74 beam.smp
Run Code Online (Sandbox Code Playgroud)

结果rabbitmqctl status如下:

Status of node 'rabbit@li370-61' ...
[{pid,21823},
 {running_applications,[{rabbit,"RabbitMQ","2.8.4"},
                        {os_mon,"CPO  CXC 138 46","2.2.9"},
                        {sasl,"SASL  CXC 138 11","2.2.1"},
                        {mnesia,"MNESIA  CXC 138 12","4.7"},
                        {stdlib,"ERTS  CXC 138 10","1.18.1"},
                        {kernel,"ERTS  CXC …
Run Code Online (Sandbox Code Playgroud)

erlang debian mnesia rabbitmq celery

47
推荐指数
2
解决办法
7万
查看次数

如何从其他渠道恢复未确认的AMQP消息,而不是我自己的连接?

似乎我让我的Rabbitmq服务器运行的时间越长,我对未确认消息的麻烦就越多.我很乐意将它们重新排列.实际上似乎有一个amqp命令来执行此操作,但它仅适用于您的连接使用的通道.我制作了一个小的鼠兔脚本,至少尝试一下,但是我要么缺少一些东西,要么就是这样做了(用rabbitmqctl怎么样?)

import pika

credentials = pika.PlainCredentials('***', '***')
parameters = pika.ConnectionParameters(host='localhost',port=5672,\
    credentials=credentials, virtual_host='***')

def handle_delivery(body):
    """Called when we receive a message from RabbitMQ"""
    print body

def on_connected(connection):
    """Called when we are fully connected to RabbitMQ"""
    connection.channel(on_channel_open)    

def on_channel_open(new_channel):
    """Called when our channel has opened"""
    global channel
    channel = new_channel
    channel.basic_recover(callback=handle_delivery,requeue=True)    

try:
    connection = pika.SelectConnection(parameters=parameters,\
        on_open_callback=on_connected)    

    # Loop so we can communicate with RabbitMQ
    connection.ioloop.start()
except KeyboardInterrupt:
    # Gracefully close the connection
    connection.close()
    # Loop until we're fully closed, will stop on …
Run Code Online (Sandbox Code Playgroud)

amqp rabbitmq celery celeryd pika

46
推荐指数
3
解决办法
5万
查看次数

如何在启动RabbitMQ Docker容器时添加初始用户?

目前我正在使用DockerHub中的默认RabbitMQ映像启动RabbitMQ Docker容器.使用以下命令.

docker run --restart=always \
-d \
-e RABBITMQ_NODENAME=rabbitmq \
-v /opt/docker/rabbitmq/data:/var/lib/rabbitmq/mnesia/rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
--name rabbitmq rabbitmq:3-management
Run Code Online (Sandbox Code Playgroud)

我需要在首次启动映像时提供默认用户/和虚拟主机.例如,创建默认的"测试用户".

目前,我必须通过使用管理插件并通过web ui添加用户/虚拟主机来手动执行此操作.有没有办法在启动RabbitMQ图像时提供默认设置?

rabbitmq docker

45
推荐指数
7
解决办法
3万
查看次数