aru*_*nvg 5 java event-driven-design publish-subscribe zeromq jeromq
场景:
我们正在ZeroMQ
(特别jeroMq
)评估事件驱动机制.
应用程序分布在多个服务(发布者和订阅者都是服务)可以存在于同一个jvm或不同节点中,这取决于部署体系结构.
意见
为了玩游戏我使用jero mq 创建了一个pub
/ sub
pattern inproc:
作为传输(版本:0.3.5)
题
使用inproc:
连同pub
/sub
可行吗?
尝试谷歌搜索,但找不到任何具体的,任何见解?
pub
/ sub
with的代码示例inproc:
使用jero mq(版本:0.3.5)的inproc pub sub的工作代码示例对以后访问此帖子的人有用.一个出版商出版的话题A
和B
,和两个用户接收A
并B
分别
/**
* @param args
*/
public static void main(String[] args) {
// The single ZMQ instance
final Context context = ZMQ.context(1);
ExecutorService executorService = Executors.newFixedThreadPool(3);
//Publisher
executorService.execute(new Runnable() {
@Override
public void run() {
startPublishing(context);
}
});
//Subscriber for topic "A"
executorService.execute(new Runnable() {
@Override
public void run() {
startFirstSubscriber(context);
}
});
// Subscriber for topic "B"
executorService.execute(new Runnable() {
@Override
public void run() {
startSecondSubscriber(context);
}
});
}
/**
* Prepare the publisher and publish
*
* @param context
*/
private static void startPublishing(Context context) {
Socket publisher = context.socket(ZMQ.PUB);
publisher.bind("inproc://test");
while (!Thread.currentThread().isInterrupted()) {
// Write two messages, each with an envelope and content
try {
publisher.sendMore("A");
publisher.send("We don't want to see this");
LockSupport.parkNanos(1000);
publisher.sendMore("B");
publisher.send("We would like to see this");
} catch (Throwable e) {
e.printStackTrace();
}
}
publisher.close();
context.term();
}
/**
* Prepare and receive through the subscriber
*
* @param context
*/
private static void startFirstSubscriber(Context context) {
Socket subscriber = context.socket(ZMQ.SUB);
subscriber.connect("inproc://test");
subscriber.subscribe("B".getBytes());
while (!Thread.currentThread().isInterrupted()) {
// Read envelope with address
String address = subscriber.recvStr();
// Read message contents
String contents = subscriber.recvStr();
System.out.println("Subscriber1 " + address + " : " + contents);
}
subscriber.close();
context.term();
}
/**
* Prepare and receive though the subscriber
*
* @param context
*/
private static void startSecondSubscriber(Context context) {
// Prepare our context and subscriber
Socket subscriber = context.socket(ZMQ.SUB);
subscriber.connect("inproc://test");
subscriber.subscribe("A".getBytes());
while (!Thread.currentThread().isInterrupted()) {
// Read envelope with address
String address = subscriber.recvStr();
// Read message contents
String contents = subscriber.recvStr();
System.out.println("Subscriber2 " + address + " : " + contents);
}
subscriber.close();
context.term();
}
Run Code Online (Sandbox Code Playgroud)
ZMQ inproc
传输旨在用于不同线程之间的单个进程中.当你说"可以存在于同一个jvm 或不同的节点中 "(强调我的)时,我认为你的意思是你将多个进程作为分布式服务而不是单个进程中的多个线程.
如果是这样,那么不,你想要做的事情将无法使用inproc
. PUB-SUB/inproc
在多个线程之间的单个进程中工作正常.
编辑以解决评论中的其他问题:
之所以用运输像inproc
或者ipc
是因为它是一个小更有效(快),比TCP传输,当你在正确的上下文是使用它们.你可以想象使用混合的传输,但你总是必须绑定并连接在同一个传输上才能使它工作.
这意味着,每个节点将需要多达三个PUB
或SUB
插座-一个tcp
出版商交谈远程主机上的节点,一个ipc
发布者交谈上在同一主机上不同的过程的节点,以及inproc
发布者交谈节点在不同的线程中相同的处理.
实际上,在大多数情况下,您只需使用tcp
传输,只需旋转一个插槽即可 - tcp
无处不在.它可能是有意义的自旋向上多个插槽,如果每个插座负责特定样的信息.
如果有,你永远可以发送一个消息类型给其他线程和不同的消息类型到其他主机的理由,那么多的插座是有道理的,但在你的情况下,它听起来就像,从一个节点的角度来看,所有其他节点是平等的.在那种情况下,我会tcp
在任何地方使用它并完成它.