ZeroMQ,我们可以使用inproc:transport以及pub/sub消息传递模式

aru*_*nvg 5 java event-driven-design publish-subscribe zeromq jeromq

场景:

我们正在ZeroMQ(特别jeroMq)评估事件驱动机制.

应用程序分布在多个服务(发布者和订阅者都是服务)可以存在于同一个jvm或不同节点中,这取决于部署体系结构.

意见

为了玩游戏我使用jero mq 创建了一个pub/ subpattern inproc:作为传输(版本:0.3.5)

  1. 线程发布能够发布(看起来像发布,至少没有错误)
  2. 另一个线程中的订户没有收到任何东西.

使用inproc:连同pub/sub可行吗?

尝试谷歌搜索,但找不到任何具体的,任何见解?

pub/ subwith的代码示例inproc:

使用jero mq(版本:0.3.5)的inproc pub sub的工作代码示例对以后访问此帖子的人有用.一个出版商出版的话题AB,和两个用户接收AB分别

/**
 * @param args
 */
public static void main(String[] args) {

    // The single ZMQ instance
    final Context context = ZMQ.context(1);

    ExecutorService executorService = Executors.newFixedThreadPool(3);
    //Publisher
    executorService.execute(new Runnable() {

        @Override
        public void run() {
            startPublishing(context);
        }
    });
    //Subscriber for topic "A"
    executorService.execute(new Runnable() {

        @Override
        public void run() {
            startFirstSubscriber(context);
        }
    });
    // Subscriber for topic "B"
    executorService.execute(new Runnable() {

        @Override
        public void run() {
            startSecondSubscriber(context);
        }
    });

}

/**
 * Prepare the publisher and publish
 * 
 * @param context
 */
private static void startPublishing(Context context) {

    Socket publisher = context.socket(ZMQ.PUB);
    publisher.bind("inproc://test");
    while (!Thread.currentThread().isInterrupted()) {
        // Write two messages, each with an envelope and content
        try {
            publisher.sendMore("A");
            publisher.send("We don't want to see this");
            LockSupport.parkNanos(1000);
            publisher.sendMore("B");
            publisher.send("We would like to see this");
        } catch (Throwable e) {

            e.printStackTrace();
        }
    }
    publisher.close();
    context.term();
}

/**
 * Prepare and receive through the subscriber
 * 
 * @param context
 */
private static void startFirstSubscriber(Context context) {

    Socket subscriber = context.socket(ZMQ.SUB);

    subscriber.connect("inproc://test");

    subscriber.subscribe("B".getBytes());
    while (!Thread.currentThread().isInterrupted()) {
        // Read envelope with address
        String address = subscriber.recvStr();
        // Read message contents
        String contents = subscriber.recvStr();
        System.out.println("Subscriber1 " + address + " : " + contents);
    }
    subscriber.close();
    context.term();

}

/**
 * Prepare and receive though the subscriber
 * 
 * @param context
 */
private static void startSecondSubscriber(Context context) {
    // Prepare our context and subscriber

    Socket subscriber = context.socket(ZMQ.SUB);

    subscriber.connect("inproc://test");
    subscriber.subscribe("A".getBytes());
    while (!Thread.currentThread().isInterrupted()) {
        // Read envelope with address
        String address = subscriber.recvStr();
        // Read message contents
        String contents = subscriber.recvStr();
        System.out.println("Subscriber2 " + address + " : " + contents);
    }
    subscriber.close();
    context.term();

}
Run Code Online (Sandbox Code Playgroud)

Jas*_*son 5

ZMQ inproc传输旨在用于不同线程之间的单个进程中.当你说"可以存在于同一个jvm 或不同的节点中 "(强调我的)时,我认为你的意思是你将多个进程作为分布式服务而不是单个进程中的多个线程.

如果是这样,那么不,你想要做的事情将无法使用inproc. PUB-SUB/inproc在多个线程之间的单个进程中工作正常.


编辑以解决评论中的其他问题:

之所以用运输像inproc或者ipc是因为它是一个小更有效(快),比TCP传输,当你在正确的上下文是使用它们.你可以想象使用混合的传输,但你总是必须绑定并连接在同一个传输上才能使它工作.

这意味着,每个节点将需要多达三个PUBSUB插座-一个tcp出版商交谈远程主机上的节点,一个ipc发布者交谈上在同一主机上不同的过程的节点,以及inproc发布者交谈节点在不同的线程中相同的处理.

实际上,在大多数情况下,您只需使用tcp传输,只需旋转一个插槽即可 - tcp无处不在.它可能是有意义的自旋向上多个插槽,如果每个插座负责特定样的信息.

如果有,你永远可以发送一个消息类型给其他线程和不同的消息类型到其他主机的理由,那么多的插座是有道理的,但在你的情况下,它听起来就像,从一个节点的角度来看,所有其他节点是平等的.在那种情况下,我会tcp在任何地方使用它并完成它.