use*_*878 8 distributed-computing zeromq
在阅读zeromq指南时,我遇到了客户端代码,它在循环中发送100k请求,然后在第二个循环中接收回复.
#include "../include/mdp.h"
#include <time.h>
int main (int argc, char *argv [])
{
int verbose = (argc > 1 && streq (argv [1], "-v"));
mdp_client_t *session = mdp_client_new ("tcp://localhost:5555", verbose);
int count;
for (count = 0; count < 100000; count++) {
zmsg_t *request = zmsg_new ();
zmsg_pushstr (request, "Hello world");
mdp_client_send (session, "echo", &request);
}
printf("sent all\n");
for (count = 0; count < 100000; count++) {
zmsg_t *reply = mdp_client_recv (session,NULL,NULL);
if (reply)
zmsg_destroy (&reply);
else
break; // Interrupted by Ctrl-C
printf("reply received:%d\n", count);
}
printf ("%d replies received\n", count);
mdp_client_destroy (&session);
return 0;
}
Run Code Online (Sandbox Code Playgroud)
我添加了一个计数器来计算worker(test_worker.c)发送给代理的回复数,以及mdp_broker.c中的另一个计数器来计算代理发送给客户端的回复数.这两个都高达100k,但客户端只收到大约37k回复.
如果客户端请求的数量设置为大约40k,那么它将收到所有回复.当客户端发送超过40k的异步请求时,有人可以告诉我为什么数据包会丢失吗?
我尝试将代理套接字的HWM设置为100k,但问题仍然存在:
static broker_t *
s_broker_new (int verbose)
{
broker_t *self = (broker_t *) zmalloc (sizeof (broker_t));
int64_t hwm = 100000;
// Initialize broker state
self->ctx = zctx_new ();
self->socket = zsocket_new (self->ctx, ZMQ_ROUTER);
zmq_setsockopt(self->socket, ZMQ_SNDHWM, &hwm, sizeof(hwm));
zmq_setsockopt(self->socket, ZMQ_RCVHWM, &hwm, sizeof(hwm));
self->verbose = verbose;
self->services = zhash_new ();
self->workers = zhash_new ();
self->waiting = zlist_new ();
self->heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
return self;
}
Run Code Online (Sandbox Code Playgroud)
如果不设置 HWM 并使用默认 TCP 设置,仅 50k 消息就会发生数据包丢失。
以下措施有助于减少代理的数据包丢失:
这只能在一定程度上有所帮助。对于两个客户端,每个客户端发送 100k 消息,代理能够很好地管理。但当客户数量增加到三个时,他们就不再收到所有回复。
最后,帮助我确保不丢包的是通过以下方式更改客户端代码的设计: