我需要一个高性能的消息总线为我的应用程序,所以我正在评估ZeroMQ,RabbitMQ和Apache Qpid.为了衡量性能,我正在运行一个测试程序,该程序使用其中一个消息队列实现发布说10,000条消息,并在同一台机器上运行另一个进程来使用这10,000条消息.然后我记录发布的第一条消息和收到的最后一条消息之间的时差.
以下是我用于比较的设置.
RabbitMQ:我使用了"扇出"类型交换和具有默认配置的队列.我使用了RabbitMQ C客户端库.ZeroMQ:我的发布者tcp://localhost:port1使用ZMQ_PUSH套接字发布,My broker侦听tcp://localhost:port1并将消息重新发送到tcp:// localhost:port2,我的消费者tcp://localhost:port2使用ZMQ_PULL套接字侦听.我正在使用代理而不是对等通信ZeroMQ来使性能比较公平到使用代理的其他消息队列实现.QpidC++消息代理:我使用了"扇出"类型交换和具有默认配置的队列.我使用了Qpid C++客户端库.以下是效果结果:
RabbitMQ:接收10,000条消息大约需要1秒钟.ZeroMQ:接收10,000条消息大约需要15毫秒.Qpid:接收10,000条消息大约需要4秒钟.问题:
RabbitMQ或Qpid使其性能更好?注意:
测试是在具有两个分配处理器的虚拟机上完成的.结果可能因硬件而异,但我主要对MQ产品的相对性能感兴趣.
我正在浏览一个开源消息传递软件,经过一些研究,我发现了这三个产品.我已经把这些用于初步测试,让他们处理队列和主题的消息,从我所读到的,所有这三个产品都是大多数公司的开源消息解决方案的好选择.我想知道这些产品可能具有哪些优势?我特别感兴趣的是消息吞吐量,包括持久消息吞吐量,安全性,可伸缩性,可靠性,支持,路由功能,度量和监视等管理选项,以及每个程序在大型业务环境中运行的程度.
我正在研究如何确定我们未来产品的解决方案,我无法真正理解这一点.
有一堆AMQP 0.9.1实现(RabbitMQ,Apache Qpid,OpenAMQ,仅举几例),但没有AMQP 1.0实现,尽管1.0已经在2011年10月完成.好吧,除了SwiftMQ [1].
阅读1.0,它似乎与1.0之前的规范有很大的不同,所以似乎可以理解的是,对一些工作正常的重大改写几乎没什么热情.事实上,我不明白为什么RabbitMQ和其他人不会决定迁移到ZeroMQ而不是AMQP 1.0.
尽管如此,除了一些模糊的承诺,例如"努力始终实施最新的AMQP规范"之外,我还没有找到任何关于1.0 AMQP前规范的实施者的明确声明.
编辑: RabbitMQ确实说
然而,有些东西告诉我,声明已超过3年,即它早于AMQP 1.0的发布.
那么有没有迹象表明AMQP 1.0可能成为一个标准,除了主要银行 - 和微软 - 支持它的事实?顺便说一句.没有自己的实现.
看起来AMQP 0.9.1似乎比1.0更标准.
嗯,有https://github.com/rabbitmq/rabbitmq-amqp1.0,它自称的状态是原型,显然没有工作半年.
[1]我对SwiftMQ的第一印象是我通过其作者对Spring缺乏AMQP支持的咆哮得到的,这就是为什么我暂时不考虑它.我不想指望那个家伙的支持.
有没有人有一个在独立的 junit测试中使用Apache Qpid的例子.
理想情况下,我希望能够动态创建一个队列,我可以在我的测试中放置/获取msgs.所以我没有在我的测试中测试QPid,我会使用集成测试,但是测试处理msgs的方法非常有用,不得不模拟一大堆服务.
我想阅读服务总线的订阅消息.我正在使用qpid-protonpython库.我正在关注此链接以接收消息Proton-Python-Example-Simple-Receive.我正在通过此网址接收来自服务总线的消息 -
url = 'amqps://mynamespace.servicebus.windows.net/SharedAccessKeyName=xxxx/SharedAccessKey=xxxxxxxxx/python-test/Subscriptions/AllMessages'
# python-test is the name of the topic
# AllMessages is the name of the subscription
Run Code Online (Sandbox Code Playgroud)
我收到以下错误 - ERROR:root:The messaging entity 'sb://mynamespace.servicebus.windows.net/sharedaccesskeyname=xxxxx/sharedaccesskey=xxxxxxxxxxxxx/python-test/subscriptions/allmessages' could not be found. TrackingId:c1e4a39edbd44040b2fd48a552d6ae2b_G2, SystemTracker:gateway6, Timestamp:7/19/2017 7:58:51 AM
这是因为上述URL未正确形成.我在网上搜索过,在这方面没有提供适当的文件.通过qpid读取订阅消息的正确URL格式是什么.
python qpid azureservicebus azure-servicebus-queues azure-servicebus-topics
弹簧配置
<bean id="jmsQueueConnectionFactory" class="org.apache.qpid.client.AMQConnectionFactory">
<constructor-arg index="0"
value="amqp://guest:guest@localhost/test?brokerlist='tcp://localhost:5672'" />
</bean>
<bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="jmsQueueConnectionFactory" />
<property name="sessionCacheSize" value="1" />
<property name="reconnectOnException" value="true" />
</bean>
<bean id="myDestination" class="org.apache.qpid.client.AMQAnyDestination">
<constructor-arg index="0" value="ADDR:myqueue; {create: always}" />
</bean>
<bean id="myServiceBean" class="com.test.MyService" />
<bean id="myContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="cachingConnectionFactory" />
<property name="exceptionListener" ref="cachingConnectionFactory" />
<property name="messageListener" ref="myServiceBean" />
<property name="concurrentConsumers" value="1" />
<property name="autoStartup" value="true" />
<property name="destination" ref="myDestination" />
<property name="recoveryInterval" value="10000" />
</bean>
Run Code Online (Sandbox Code Playgroud)
MyService.java
public class MyService implements MessageListener {
public void …Run Code Online (Sandbox Code Playgroud) 对这两个术语有点困惑,我在想具有持久消息但是瞬态(非持久)队列的目的是什么?毕竟,如果代理重新启动并且队列未恢复,则将浪费恢复的消息.
我正在考虑使用 RabbitMQ 或 ActiveMQ 等产品。我看到在某种程度上,这些产品对 AMQP v1.0 提供了一定程度的支持。
但是,我正在努力寻找将 AMQP 1.0 与 .NET 结合使用的客户端。到目前为止,我遇到的唯一一个是 Apache Qpid。但是,必须安装其所有依赖项并构建它,而不是仅使用 NuGet 之类的东西来获取客户端库,这似乎有点麻烦,并且不会让我有信心继续支持。
Microsoft Azure 服务总线支持 AMQP 1.0 并有一个客户端,但据我所知,它似乎特定于该产品,这很遗憾,因为我认为 AMQP 的一个好处是可以轻松切换不同的代理。
从我在 RabbitMQ 上读到的内容来看,他们似乎没有承诺为 AMQP 1.0 构建客户端。
有没有人对以上有任何想法,是否有任何我忽略的客户?
提前谢谢了
我正在尝试使用C中的Qpid Proton编写AMQP 1.0客户端.我不想使用messenger.我想使用质子c引擎.我在弄清楚如何做到这一点时遇到了一些麻烦.我的主要关键点是为连接设置端点.使用我能找到的质子c引擎的C客户端的唯一例子就在这里.
https://github.com/apache/qpid-proton/blob/master/examples/engine/c/psend.c
但是,它使用的结构不属于Qpid Proton C API 0.12.0.具体来说,我没有看到pn_driver_t或pn_connector_t作为0.12.0 API的一部分.
我试图遵循AMQP 1.0规范中定义的一般工作流程1)创建连接,2)创建会话,3)创建发送者链接.我对C不太熟悉,这是我第一次使用Qpid Proton库的非信使部分,如果我错过了一些明显的东西,请原谅我.这是我目前的代码.我一直在尝试不同的选择并寻找好几天.
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include "proton/message.h"
#include "proton/messenger.h"
#include "proton/connection.h"
#include "proton/session.h"
#include "proton/link.h"
#include "proton/delivery.h"
#include "proton/event.h"
#include "proton/engine.h"
//State integer values are defined in Connection macros
//https://qpid.apache.org/releases/qpid-proton-0.12.0/proton/c/api/group__connection.html
void print_state(char * name, pn_state_t state)
{
printf("[%s] local: %i, remote: %i\n", name, PN_LOCAL_MASK & state, PN_REMOTE_MASK & state);
}
//Reference https://github.com/apache/qpid-proton/blob/master/examples/engine/c/psend.c
void send_engine()
{
struct pn_connection_t * connection;
connection = pn_connection();
//STACKOVERFLOW - I have …Run Code Online (Sandbox Code Playgroud) 我正在将 Qpid Proton Python 用于 AMQP 使用者,该使用者运行的作业可以持续 +1 分钟。
工作完成后,我得到了一个connection_closedwith Condition('amqp:resource-limit-exceeded', 'local-idle-timeout expired')。
我了解发生这种情况是因为我的阻塞工作阻止了心跳。
令我困惑的是为什么我没有重新连接。调试质子,我得到了这行代码,其中self.connection.state的值为36,因此self.connection.state & Endpoint.LOCAL_ACTIVE返回 0。
这是重现场景的工作代码:
from __future__ import print_function
from time import sleep
from proton.handlers import MessagingHandler
from proton.reactor import Container
class ExampleConsumer(MessagingHandler):
def __init__(self, queue):
super().__init__(2, False)
self.queue = queue
def on_start(self, event):
self.container = event.container
self.conn = event.container.connect(url='localhost:5672')
self.receiver = event.container.create_receiver(self.conn, self.queue)
print('listening for new messages on /' + self.queue)
def …Run Code Online (Sandbox Code Playgroud)