我们有兴趣听听ActiveMQ与RabbitMQ和ZeroMQ的优缺点.还欢迎有关任何其他有趣的消息队列的信息.
我需要一个高性能的消息总线为我的应用程序,所以我正在评估ZeroMQ
,RabbitMQ
和Apache Qpid
.为了衡量性能,我正在运行一个测试程序,该程序使用其中一个消息队列实现发布说10,000条消息,并在同一台机器上运行另一个进程来使用这10,000条消息.然后我记录发布的第一条消息和收到的最后一条消息之间的时差.
以下是我用于比较的设置.
RabbitMQ
:我使用了"扇出"类型交换和具有默认配置的队列.我使用了RabbitMQ C客户端库.ZeroMQ
:我的发布者tcp://localhost:port1
使用ZMQ_PUSH
套接字发布,My broker侦听tcp://localhost:port1
并将消息重新发送到tcp:// localhost:port2,我的消费者tcp://localhost:port2
使用ZMQ_PULL
套接字侦听.我正在使用代理而不是对等通信ZeroMQ
来使性能比较公平到使用代理的其他消息队列实现.Qpid
C++消息代理:我使用了"扇出"类型交换和具有默认配置的队列.我使用了Qpid C++客户端库.以下是效果结果:
RabbitMQ
:接收10,000条消息大约需要1秒钟.ZeroMQ
:接收10,000条消息大约需要15毫秒.Qpid
:接收10,000条消息大约需要4秒钟.问题:
RabbitMQ
或Qpid
使其性能更好?注意:
测试是在具有两个分配处理器的虚拟机上完成的.结果可能因硬件而异,但我主要对MQ产品的相对性能感兴趣.
而不是写自己的图书馆.
我们正在开发一个项目,这个项目将是一个自我划分的服务器池,如果一个部分变得太重,管理员会将其划分并将其作为一个单独的进程放在另一台机器上.它还会警告所有连接的客户端,这会影响连接到新服务器.
我很好奇使用ZeroMQ进行服务器间和进程间通信.我的伴侣宁愿自己动手.我期待社区回答这个问题.
我自己是一个相当新手的程序员,只是学习了消息队列.正如我用Google搜索和阅读,似乎每个人都在使用消息队列来处理各种各样的事情,但为什么呢?是什么让他们比编写自己的图书馆更好?为什么它们如此常见,为什么会有这么多?
这段代码效果很好:
import zmq, json, time
def main():
context = zmq.Context()
subscriber = context.socket(zmq.SUB)
subscriber.bind("ipc://test")
subscriber.setsockopt(zmq.SUBSCRIBE, '')
while True:
print subscriber.recv()
def main():
context = zmq.Context()
publisher = context.socket(zmq.PUB)
publisher.connect("ipc://test")
while True:
publisher.send( "hello world" )
time.sleep( 1 )
Run Code Online (Sandbox Code Playgroud)
但是这段代码不起作用:
import zmq, json, time
def recv():
context = zmq.Context()
subscriber = context.socket(zmq.SUB)
subscriber.bind("tcp://localhost:5555")
subscriber.setsockopt(zmq.SUBSCRIBE, '')
while True:
print subscriber.recv()
def send():
context = zmq.Context()
publisher = context.socket(zmq.PUB)
publisher.connect("tcp://localhost:5555")
while True:
publisher.send( "hello world" )
time.sleep( 1 )
Run Code Online (Sandbox Code Playgroud)
它引发了这个错误:
ZMQError:没有这样的设备 …
我刚刚开始使用ZMQ.我正在设计一个应用程序的工作流程为:
我假设PUSH/PULL架构适用于此.请纠正我.
但是我该如何处理这些情况呢?
那么如何设置PUSH/PULL模型中的超时?
编辑:感谢user938949的建议,我得到了一个有效的答案,我正在为后代分享它.
我正在研究使用ZeroMQ作为相当大的分布式系统的消息传递/传输层,主要针对监控和数据收集(许多生产者,少数消费者).
据我所知,目前有两种不同的同一概念实现; ZeroMQ和Crossroads I/O,后者是ZeroMQ的分支(2012年?).
我试图找出使用哪一个并想知道它们之间的差异,但到目前为止还没有找到关于此的更多信息.
例如:
基本上,如何选择一个而不是另一个?
我有三个应用程序互相交谈.一个websocket服务器(1)接受来自浏览器的连接,解析url以查看所需的数据,如果它有内存中的数据,则将其提供给客户端,如果不是从另一个名为"fetcher"的应用程序请求它(2).Fetcher接收此作业,从返回JSON数据的简单API(3)请求它,并将其发送回websocker服务器,后者将其发布到连接的客户端.然后,"Fetcher"开始定期检查该url/job是否有更新,并在发生新数据时将其发送到websocket服务器.
我使用socket.io进行客户端 - websocket服务器通信.Websocket服务器和fetcher通过ZMQ套接字进行会谈.
我用130个连接加载测试websocket服务器.Websocket服务器每秒向130个客户端发布160KB的数据.最初,它使用170mb的RAM连接130个,但很快就增加到1GB,尽管没有新的连接.然后socket.io的心跳信号开始失败,导致连接断开.
我使用Nodetime来获取堆快照.在第130个客户端连接之后,这是内存的外观:
346缓冲区对象,总共44MB.
在四分钟内,Buffer对象的数量急剧增加(同样,没有新连接):其中有3012个,总内存为486MB.再过10分钟,其中有3535个,总内存消耗为573MB.
我使用Mozilla的memwatch找出哪一行添加到内存中,并发现是这个函数:
function notifyObservers(resourceId) {
var data = resourceData[resourceId];
io.sockets.in(resourceId).emit('data', data);
}
Run Code Online (Sandbox Code Playgroud)
如果我评论这些行,内存使用保持不变,这是另一个确认.
任何想法如何发生这种情况?我在ZMQ的订阅者套接字方法中调用了这个函数,我怀疑它与它有关.如果我删除函数并将它们合并为一个函数,这就是生成的代码:
// Receive new resource data
const resourceUpdatedSubscriber = zmq.socket('sub').connect('tcp://localhost:5433');
resourceUpdatedSubscriber.subscribe('');
resourceUpdatedSubscriber.on('message', function (data) {
var resource = JSON.parse(data);
resourceData[resource.id] = resource.data;
io.sockets.in(resourceId).emit('data', resourceData[resource.id]);
});
Run Code Online (Sandbox Code Playgroud)
我的所有代码(包括负载测试)都是公共的,您可以在此处找到此Web套接字服务器:https://github.com/denizozger/node-socketio/blob/master/server.js请参阅第138行.
两个月前我开始学习Javascript和Node.js所以欢迎提出任何意见,谢谢!
我很担心poller在zmq中实际做了什么.zguide最低限度地进入它,并且仅将其描述为从多个套接字读取的方式.这对我来说不是一个令人满意的答案,因为它没有解释如何使用超时套接字.我知道zeromq:如何防止无限期等待?解释推/拉,但不是req/rep模式,这是我想知道如何使用.
我试图问的是:poller如何工作,它的功能如何应用于跟踪套接字及其请求?