我对ZeroMQ很新.我已阅读该指南,目前正在浏览这些示例以及查看网络上的其他相关信息.我对使用什么消息模式或者我应该使用2种模式的组合有一些犹豫不决.
我有一个现有的软件应用程序,它有一个需要更换的本地消息系统.我有一个相当简单的架构:
|Client|<----->|driver1|
|
|---|driverN|
Run Code Online (Sandbox Code Playgroud)
目前一次只有一个"客户端"连接到驱动程序,并且可能有许多驱动程序.
(实际上,在这种情况下,客户端不是真正的我的客户端应用程序,而是各种类型的中间人.对于此讨论,它可以被视为客户端)
消息:
驱动程序可能存在于同一系统上,也可能存在于LAN上.这不是一个公共网络.
我目前正在考虑在每个驱动程序上都有一个pub和sub套接字,在客户端上有一个sub/pub套接字.建立连接后不应删除消息.我假设客户端将订阅不同的驱动程序数据类型,然后驱动程序将订阅客户端命令消息.
重要注意事项:低延迟,最低可能的带宽开销.
我将不胜感激任何建议或建议!提前致谢!
你选择了一个很好的学习练习,这是肯定的!
阅读这些内容,它们使用自定义路由器到路由器代理和轮询提供请求/回复的基本实现,并应解决您的客户端到设备问题.
解决方案是同步的,因此从客户端发送的任何请求都会阻塞,直到获得响应.我个人会对请求和回复使用异步以获得完全的灵活性,但该解决方案更复杂.但是,书中有一些例子Freelance,Dealer/Router它们说明了异步请求/回复.
这是同步多对多请求/回复的示例.你必须知道ZeroMq包络是如何工作的,以充分理解这种方法的机制; 参见示例lbbroker1.
设置客户端标识setIdentity(); 对于响应路由很重要.
客户端发送的请求device1,device2等等,在一个循环; 如果设备存在,则从特定设备返回状态消息,否则,"无设备"返回给客户端.
Socket client = context.socket(ZMQ.REQ);
client.setIdentity("client1".getBytes());
client.connect("tcp://localhost:5550");
for( int i = 0; i < 5; i++){
client.send("device" + i);
String reply = client.recvStr();
log("Received message: " + reply);
Thread.currentThread().sleep(1000);
}
Run Code Online (Sandbox Code Playgroud)
设备设置ID就像客户端的唯一路由一样.
设备发送device.send("DEVICEREADY")到服务器以指示在线可用性.
设备执行recvStr()三次从服务器读取完整信封.
String deviceId = "device1"
Socket device = context.socket(ZMQ.REQ);
device.setIdentity(deviceId.getBytes());
device.connect( "tcp://localhost:5560");
device.send( "DEVICEREADY");
while (!Thread.currentThread().isInterrupted()) {
String clientAddress = device.recvStr();
String empty = device.recvStr();
String clientRequest = device.recvStr();
//create envelope to send reply to same client who made request
device.sendMore(clientAddress);
device.sendMore("");
device.send( "stauts on " + deviceId + " is ok");
}
Run Code Online (Sandbox Code Playgroud)
使用ROUTER套接字的自定义代理; 客户端连接到前端ROUTER套接字,而设备连接到后端路由器.服务器在两个套接字上轮询消息.
Context context = ZMQ.context(1);
Socket frontend = context.socket(ZMQ.ROUTER);
Socket backend = context.socket(ZMQ.ROUTER);
frontend.bind( "tcp://localhost:5550");
backend.bind( "tcp://localhost:5560");
Poller poller = new Poller(2);
poller.register(frontend, Poller.POLLIN);
poller.register(backend, Poller.POLLIN);
while (!Thread.currentThread().isInterrupted()) {
poller.poll();
//frontend poller
if (poller.pollin(0)) {
String clientId = frontend.recvStr();
String empty = frontend.recvStr(); //empty frame
String deviceId = frontend.recvStr();
//if client is requesting to talk to nonexistent deviceId,
//return message "no device", otherwise, create envelope and send
//request on backend router to device.
if( deviceMap.get( deviceId) == null ){
frontend.sendMore(clientId);
frontend.sendMore("");
frontend.send("no deviceId: " + deviceId);
} else {
//request envelope addressed to specific device
backend.sendMore(deviceId);
backend.sendMore("");
backend.sendMore(clientId);
backend.sendMore("");
backend.send("hello from " + clientId);
}
}
//backend poller
if(poller.pollin(1)){
String deviceId = backend.recvStr();
String empty = backend.recvStr();
String clientId = backend.recvStr();
//device signaling it's ready
//store deviceId in map, don't send a response
if( clientId.equals("DEVICEREADY"))
deviceMap.put(deviceId, deviceId);
else {
//the device is sending a response to a client
//create envelope addressed to client, send on frontend socket
empty = backend.recvStr();
String reply = backend.recvStr();
frontend.sendMore(clientId);
frontend.sendMore("");
frontend.send(reply);
}
}
}
Run Code Online (Sandbox Code Playgroud)