如何使用 cppzmq 将 ZeroMQ 消息从 ROUTER 套接字发送到特定的 DEALER 套接字?

Vpa*_*ino 7 c++ sockets zeromq

我已经将这个最小的示例放在一起,以便将消息从路由器套接字发送到特定的DEALER socker(具有其身份集)。当运行这两个程序时,它似乎挂在ROUTER上等待来自DEALER的答复,并且DEALER挂起等待来自ROUTER的请求。因此,看起来ROUTER发送的消息永远不会到达DEALER

\n\n

路由器.cpp

\n\n
#include <iostream>\n#include <zmq.hpp>\n#include <string>\n#include <thread>\n#include <chrono>\n\nint main() {\n    zmq::context_t context;\n    zmq::socket_t socket (context, zmq::socket_type::router);\n    // Enforce sending routable messages only\n    socket.setsockopt(ZMQ_ROUTER_MANDATORY, 1);\n    socket.bind("tcp://*:5555");\n\n    try {\n        std::string jobRequest = "ExampleJobRequest";\n\n        std::cout << "Router: Sending msg: " << jobRequest << std::endl;\n\n        // Set the address, then the empty delimiter and then the request itself\n        socket.send("PEER2", ZMQ_SNDMORE);\n        //socket.send(zmq::message_t(), ZMQ_SNDMORE);\n        socket.send(zmq::str_buffer("ExampleJobRequest")) ;\n\n        // Set the address, then the empty delimiter and then the request itself\n        socket.send("PEER2", ZMQ_SNDMORE);\n        //socket.send(zmq::message_t(), ZMQ_SNDMORE);\n        socket.send(zmq::str_buffer("ExampleJobRequest")) ;\n\n        // Receive the reply from the camera\n        std::cout << "Router: Waiting for reply from camera " << std::endl;\n        zmq::message_t reply;\n        socket.recv(&reply);\n\n        std::cout << "Router: Received " <<  std::string(static_cast<char*>(reply.data()), reply.size()) << std::endl;\n    } catch (std::exception e) {\n        std::cout << "Router Error: " << e.what();\n    }\n\n    std::this_thread::sleep_for(std::chrono::seconds(1));\n    socket.close();\n    context.close();\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

经销商.cpp

\n\n
#include <zmq.hpp>\n#include <string>\n#include <iostream>\n#include <thread>\n\nint main (void)\n{\n    //  Prepare our context and socket\n    zmq::context_t context;\n    zmq::socket_t socket (context, zmq::socket_type::dealer);\n\n    std::cout << "Dealer: Connecting to RunJob server\xe2\x80\xa6 \\n";\n    socket.setsockopt(ZMQ_IDENTITY, "PEER2", 5);\n    socket.connect ("tcp://localhost:5555");\n\n    while(true) {\n        try {\n            // Wait for next request from client\n            std::cout << "Dealer: Waiting for request" << std::endl;\n            zmq::message_t request;\n            zmq::message_t empty;\n\n            // Receive request\n            socket.recv(&request);\n\n            std::string requestString = std::string(static_cast<char*>(request.data()), request.size());\n\n            std::cout << "Dealer: Received request" << std::endl;\n            std::cout << requestString << std::endl;\n\n            // ZMQ_SNDMORE - "Specifies that the message being sent is a multi-part message, and that further message parts are to follow"\n            socket.send(zmq::str_buffer("Job completed"), zmq::send_flags::dontwait);\n        }catch (std::exception e) {\n            std::cout << "Router Error: " << e.what();\n        }\n    }\n\n    // Used to set various 0MQ Socket Settings\n    // ZMQ_Linger - Set linger period for socket shutdown\n    socket.setsockopt(ZMQ_LINGER, 0);\n    socket.close();\n    context.close();\n\n    return 0;\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

我最初认为应该在消息前面添加一个空分隔符 ,socket.send(zmq::message_t(), ZMQ_SNDMORE);但这导致了错误。另外,使用以下内容还会导致在 try/catch 块中引发错误。该错误只是打印“未知错误”:

\n\n
zmq::message_t delimiter(0);\nsocket.send(delimiter, ZMQ_SNDMORE);\n
Run Code Online (Sandbox Code Playgroud)\n\n

使用以下命令创建分隔符也会导致相同的错误:

\n\n
zmq::message_t delimiter(0);\nsocket.send(delimiter, ZMQ_SNDMORE);\n
Run Code Online (Sandbox Code Playgroud)\n\n

据我所知,使用 cppzmq 时,不需要添加空分隔符(我对此可能是错误的,但在阅读并查看其他人的示例并测试我自己的代码后,这就是我确定的)。

\n\n

这是一个非常基本的图表,说明了此设计的最终目标:

\n\n

ROUTER到DEALER消息传递设计

\n\n

在我的研究中,我还没有找到这段代码的好例子。Cppzmq github 的文档和示例很少。

\n\n

以下是我看过的其他一些来源:

\n\n\n

小智 6

ROUTER/DEALER 模式的主要思想是它是 REPLY/REQUEST 的异步概括。然而,您试图反转模式中的套接字,发现它不适合并扭曲代码以尝试使其适合。不要那样做。

你需要做的就是“顺其自然”。在简单方法中(已有示例),经销商应发送第一条消息。然后路由器对此做出响应。

下一个级别是 DEALER 在其启动消息中标识自己。然后,路由器可以向该经销商给出特定的响应。

在下一个级别,您可以实现真正的异步。ROUTER可以获取每个DEALER的标识消息的副本,并使用该消息副本随时向任何DEALER发送异步消息。识别消息的一份副本将附加“PEER2”帧并发送给经销商。这是有效的,因为消息的副本包括路由帧。理想情况下,您还可以删除“消息”帧,仅在副本中保留路由帧。

警告 - 我不使用 cppzmq,我使用 CZMQ。我可以说,使用 CZMQ 这种帧操作非常简单。