Pat*_* B. 2 c++ multithreading zeromq
众所周知,ZeroMQ套接字不应在应用程序线程之间共享.context_t然而,实例可以.
我有一个多线程应用程序,我想让每个线程不时地与REQ/REP-socket交易对手(事件,异常等)交换消息,这取决于他们正在做什么(他们正在做非ZeroMQ -东西 ).
要将消息发送到我的REQ/REP-socket,我使用以下函数
(半C++半伪代码):
sendMessage:
bool sendMessage(std::string s)
{
zmq::socket_t socket(globalContext(), ZMQ_REQ);
socket.connect("ipc://http-concentrator");
zmq::message_t message(s.size());
memcpy(message.data(), s.data(), s.size());
if (!socket.send(message))
return false;
// poll on socket for POLLIN with timeout
socket.recv(&message);
// do something with message
return true;
}
Run Code Online (Sandbox Code Playgroud)
需要时,从每个线程调用此函数.它创建一个本地套接字,连接,发送消息,并接收响应.在退出时,套接字被断开并被删除(至少我假设它已关闭).
这样,我不需要费心去维护每个线程中的套接字.这是以每次调用此函数时创建和连接为代价的.
我强调了这个代码,并没有看到重用一个套接字和这个重新连接实现之间有太大区别.(我REP/REQ在用例的两侧每秒有20k个事务,包括JSON解码/编码)
问:有没有更正确的ZeroMQ方式呢?
这是我的(当前)解决方案,在 C++11 中,您可以将对象分配给 -storage thread_local。socket_t将-instance存储static在thread_local函数中为我提供了我正在寻找的功能:
class socketPool
{
std::string endpoint_;
public:
socketPool(const std::string &ep) : endpoint_(ep) {}
zmq::socket_t & operator()()
{
thread_local static zmq::socket_t socket(
globalContext(),
ZMQ_REQ);
thread_local static bool connected;
if (!connected) {
connected = true;
socket.connect(endpoint_);
}
return socket;
}
};
// creating a pool for each endpoint
socketPool httpReqPool("ipc://http-concentrator");
Run Code Online (Sandbox Code Playgroud)
在我的sendMessage()函数中,我只是简单地执行而不是创建和连接
bool sendMessage(std::string s)
{
zmq::socket_t &socket = httpReqPool();
// the rest as above
}
Run Code Online (Sandbox Code Playgroud)
至于性能,在我的机器上速度快了 7 倍。(REQ/REP每秒 140k)。
Nota Bene:在ipc:// transport-class上将O/P从20k TPS更改为140k TPS之前发布了这个答案
Q:是否有更多的做的正确ZeroMQ路这个?
答:不容易说什么是"这个"以及"正确性" - 参数的参数是什么
鉴于此,
以下几点将更加通用
,适用于系统设计阶段推理:
这一点是双刃剑.一些开销总是与基础设施元素的设置和处理(是的,甚至关闭和拆除)REQ-AccessPoint与REQ/REP-pattern相关联,并且相关的基于套接字的传输类在-side REQ主机和两个主机上都会产生一些显着的开销.也是REP这边.
你注意到这是公平的,你在一个大约20k TPS的水平上对这个进行了定量测试,并且没有观察到这种方法的任何不利影响.还不清楚的是,是否还在同一SUT(被测系统)上进行了体内测试,以便为每个相应设计的比较提供一些基线(并允许确定开销的差异)本身).
虽然设计良好的框架会隐藏系统内部行为的这一部分来自用户维护的代码,但这并不意味着,它是一个便宜的,而不是免费的处理.
很明显,在Context()-instance线程中有一些工作被执行(...是的,复数在这里是正确的,因为一些高性能代码可能会受益于每个Context()实例使用多个I/O线程通过模式套接字和它的相应I/O线程处理程序之间明确定义的亲和映射来积极影响工作负载分布(以便以某种方式平衡,如果不能确定性地调整预期的I/O吞吐量,包括.所有相关的开销).
如果仍然存在疑虑,我们应该永远记住,命令式编程风格函数或面向对象的方法主要是外部调用者的受害者,外部调用者决定在何时以及调用这种" 从属 "代码执行单元的频率值班和被执行.函数/方法没有任何自然的反向投票(抑制)它自己从外部调用者调用的频率,而强大的设计根本不能仅仅依赖乐观的假设,这种调用不会更多通常比XYZ-k TPS(上面引用的20k可能适用于体外测试,但实际部署可能会改变几个人的命令(无论是人工测试 - 在测试期间,还是没有 - 在某些高峰时段或用户(系统) ) - 西班牙语或由于某些技术错误或硬件故障(我们都听过很多次关于NIC卡充斥L1/L2流量超出所有可以想象的限制等等 - 我们只是不知道,不知道,何时/何地将在下次再次发生).
提到的REQ/REP可扩展形式通信模式因其存在陷入外部无法解析的分布式内部死锁的风险而闻名.这总是存在避免的风险.缓解策略可能取决于实际用例的风险价值(需要认证医疗器械,fintech用例,控制循环用例,学术研究论文代码或私人爱好玩具).
参考:
REQ/REP死锁>>> /sf/answers/2671411081/
Fig.1:当+ 主要是一个不可解的分布式相互死锁(每个有限状态 - 自动机都等待"另一个"移动)并且永远不会达到"下一个" 内部状态时,为什么使用天真的所有情况都是错误的.REQ/REP[App1]in_WaitToRecvSTATE_W2R[App2]in_WaitToRecvSTATE_W2RREQ-FSA/REP-FSAin_WaitToSendSTATE_W2S
XTRN_RISK_OF_FSA_DEADLOCKED ~ { NETWORK_LoS
: || NETWORK_LoM
: || SIG_KILL( App2 )
: || ...
: }
:
[App1] ![ZeroMQ] : [ZeroMQ] ![App2]
code-control! code-control : [code-control ! code-control
+===========!=======================+ : +=====================!===========+
| ! ZMQ | : | ZMQ ! |
| ! REQ-FSA | : | REP-FSA! |
| !+------+BUF> .connect()| v |.bind() +BUF>------+! |
| !|W2S |___|>tcp:>---------[*]-----(tcp:)--|___|W2R |! |
| .send()>-o--->|___| | | |___|-o---->.recv() |
| ___/ !| ^ | |___| | | |___| ^ | |! \___ |
| REQ !| | v |___| | | |___| | v |! REP |
| \___.recv()<----o-|___| | | |___|<---o-<.send()___/ |
| !| W2R|___| | | |___| W2S|! |
| !+------<BUF+ | | <BUF+------+! |
| ! | | ! |
| ! ZMQ | | ZMQ ! |
| ! REQ-FSA | | REP-FSA ! |
~~~~~~~~~~~~~ DEADLOCKED in W2R ~~~~~~~~ * ~~~~~~ DEADLOCKED in W2R ~~~~~~~~~~~~~
| ! /\/\/\/\/\/\/\/\/\/\/\| |/\/\/\/\/\/\/\/\/\/\/! |
| ! \/\/\/\/\/\/\/\/\/\/\/| |\/\/\/\/\/\/\/\/\/\/\! |
+===========!=======================+ +=====================!===========+
Run Code Online (Sandbox Code Playgroud)