跨多个程序实例的 ZeroMQ IPC

Ala*_*che 5 c++ tcp ipc distributed-computing zeromq

我在 ZMQ 中的几个程序实例之间的进程间通信遇到了一些问题

  • 我正在使用 Linux 操作系统
  • 我正在使用 zeromq/cppzmq,libzmq 的仅标头 C++ 绑定

如果我运行此应用程序的两个实例(比如在终端上),我会为一个实例提供一个作为侦听器的参数,然后为另一个提供一个作为发送方的参数。侦听器永远不会收到消息。我试过 TCP 和 IPC 都无济于事。

#include <zmq.hpp>
#include <string>
#include <iostream>

int ListenMessage();
int SendMessage(std::string str);

zmq::context_t global_zmq_context(1);

int main(int argc, char* argv[] ) {
    std::string str = "Hello World";
    if (atoi(argv[1]) == 0) ListenMessage();
    else SendMessage(str);

    zmq_ctx_destroy(& global_zmq_context);
    return 0;
}


int SendMessage(std::string str) {
    assert(global_zmq_context);
    std::cout << "Sending \n";
    zmq::socket_t publisher(global_zmq_context, ZMQ_PUB);
    assert(publisher);

    int linger = 0;
    int rc = zmq_setsockopt(publisher, ZMQ_LINGER, &linger, sizeof(linger));
    assert(rc==0);

    rc = zmq_connect(publisher, "tcp://127.0.0.1:4506");
    if (rc == -1) {
        printf ("E: connect failed: %s\n", strerror (errno));
        return -1;
    }

    zmq::message_t message(static_cast<const void*> (str.data()), str.size());
    rc = publisher.send(message);
    if (rc == -1) {
        printf ("E: send failed: %s\n", strerror (errno));
        return -1;
    }
    return 0;
}

int ListenMessage() {
    assert(global_zmq_context);
    std::cout << "Listening \n";
    zmq::socket_t subscriber(global_zmq_context, ZMQ_SUB);
    assert(subscriber);

    int rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "", 0);
    assert(rc==0);

    int linger = 0;
    rc = zmq_setsockopt(subscriber, ZMQ_LINGER, &linger, sizeof(linger));
    assert(rc==0);

    rc = zmq_bind(subscriber, "tcp://127.0.0.1:4506");
    if (rc == -1) {
        printf ("E: bind failed: %s\n", strerror (errno));
        return -1;
    }

    std::vector<zmq::pollitem_t> p = {{subscriber, 0, ZMQ_POLLIN, 0}};
    while (true) {
        zmq::message_t rx_msg;
        // when timeout (the third argument here) is -1,
        // then block until ready to receive
        std::cout << "Still Listening before poll \n";
        zmq::poll(p.data(), 1, -1);
        std::cout << "Found an item \n"; // not reaching
        if (p[0].revents & ZMQ_POLLIN) {
            // received something on the first (only) socket
            subscriber.recv(&rx_msg);
            std::string rx_str;
            rx_str.assign(static_cast<char *>(rx_msg.data()), rx_msg.size());
            std::cout << "Received: " << rx_str << std::endl;
        }
    }
    return 0;
}
Run Code Online (Sandbox Code Playgroud)

如果我使用两个线程运行程序的一个实例,则此代码将起作用

    std::thread t_sub(ListenMessage);
    sleep(1); // Slow joiner in ZMQ PUB/SUB pattern
    std::thread t_pub(SendMessage str);
    t_pub.join();
    t_sub.join();
Run Code Online (Sandbox Code Playgroud)

但是我想知道为什么在运行程序的两个实例时上面的代码不起作用?

谢谢你的帮助!

use*_*197 3

如果您从未使用过 ZeroMQ,那么在深入了解更多细节之前,
您可能会喜欢先看一下“不到五秒的ZeroMQ原理


想知道为什么在运行程序的两个实例时上面的代码不起作用?

这段代码永远不会飞thread——它与基于或process基于处理无关[CONCURENT]

这是由于进程通信的设计错误造成

ZeroMQ 可以为此提供任一受支持的传输类:
{ ipc:// | tipc:// | tcp:// | norm:// | pgm:// | epgm:// | vmci:// }此外还可以为进程内通信提供更智能的传输类,这是一种inproc://为线程间通信做好准备的传输类,其中无堆栈通信可以享受有史以来最低的延迟,即只是一个内存映射策略。

选择基于 L3/L2 的网络堆栈进行进程通信是可能的,但这是最“昂贵”的选项


核心错误:

考虑到这种选择,任何单个进程(不是说一对进程)都会在尝试将.bind()接入点连接到同一个TCP/IP 时发生冲突。address:port#


另一个错误:

即使为了启动单独的程序,两个生成的线程都会尝试.bind()其私有AccessPoint,但没有一个线程尝试.connect()匹配的“相反” AccessPoint

至少一个人必须成功.bind()
至少一个人必须成功.connect(),才能获得一个“通道”,这里是PUB/SUB原型。


去做:

  • 决定一个合适的、足够正确的传输类(最好避免为本地主机/进程内 IPC 操作完整的 L3/L2 堆栈)
  • 重构Address:port#管理(让 2 个以上的进程不会在.bind()-(s) 上失败)到相同的(硬连线)address:port#
  • 始终检测并适当处理{PASS|FAIL}API 调用返回的 -s
  • 始终LINGER明确设置为零(你永远不知道)