Zeromq:将majordomo代理与异步客户端一起使用

use*_*878 8 distributed-computing zeromq

在阅读zeromq指南时,我遇到了客户端代码,它在循环中发送100k请求,然后在第二个循环中接收回复.

#include "../include/mdp.h"
#include <time.h>


int main (int argc, char *argv [])
{
    int verbose = (argc > 1 && streq (argv [1], "-v"));
    mdp_client_t *session = mdp_client_new ("tcp://localhost:5555", verbose);
    int count;
    for (count = 0; count < 100000; count++) {
        zmsg_t *request = zmsg_new ();
        zmsg_pushstr (request, "Hello world");
        mdp_client_send (session, "echo", &request);
    }
    printf("sent all\n");

    for (count = 0; count < 100000; count++) {
        zmsg_t *reply = mdp_client_recv (session,NULL,NULL);
        if (reply)
            zmsg_destroy (&reply);
        else
            break;              //  Interrupted by Ctrl-C
        printf("reply received:%d\n", count);
    }
    printf ("%d replies received\n", count);
    mdp_client_destroy (&session);
    return 0;
}
Run Code Online (Sandbox Code Playgroud)

我添加了一个计数器来计算worker(test_worker.c)发送给代理的回复数,以及mdp_broker.c中的另一个计数器来计算代理发送给客户端的回复数.这两个都高达100k,但客户端只收到大约37k回复.

如果客户端请求的数量设置为大约40k,那么它将收到所有回复.当客户端发送超过40k的异步请求时,有人可以告诉我为什么数据包会丢失吗?

我尝试将代理套接字的HWM设置为100k,但问题仍然存在:

static broker_t *
s_broker_new (int verbose)
{
    broker_t *self = (broker_t *) zmalloc (sizeof (broker_t));
    int64_t hwm = 100000;
    //  Initialize broker state
    self->ctx = zctx_new ();
    self->socket = zsocket_new (self->ctx, ZMQ_ROUTER);
    zmq_setsockopt(self->socket, ZMQ_SNDHWM, &hwm, sizeof(hwm));

    zmq_setsockopt(self->socket, ZMQ_RCVHWM, &hwm, sizeof(hwm));
    self->verbose = verbose;
    self->services = zhash_new ();
    self->workers = zhash_new ();
    self->waiting = zlist_new ();
    self->heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
    return self;
}
Run Code Online (Sandbox Code Playgroud)

use*_*878 3

如果不设置 HWM 并使用默认 TCP 设置,仅 50k 消息就会发生数据包丢失。

以下措施有助于减少代理的数据包丢失:

  1. 设置 Zeromq 套接字的 HWM。
  2. 增加 TCP 发送/接收缓冲区大小。

这只能在一定程度上有所帮助。对于两个客户端,每个客户端发送 100k 消息,代理能够很好地管理。但当客户数量增加到三个时,他们就不再收到所有回复。

最后,帮助我确保不丢包的是通过以下方式更改客户端代码的设计:

  1. 客户端一次最多可以发送 N 条消息。客户端的 RCVHWM 和代理的 SNDHWM 应足够高以容纳总共 N 条消息。
  2. 此后,对于客户端收到的每个回复,它都会发送两个请求。