为什么ZeroMQ PUB在没有连接订户的情况下使消息入队?(嗯,“断开连接的” SUB-s)

Lui*_*jas 5 zeromq

我看到了使用的奇怪行为ZMQ_PUB

我有一个生产.connect()-s不同工艺
.bind()ZMQ_SUB插座上。

全部订阅者.bind(),发布者.connect()-s。

当生产者启动时,它会创建一个ZMQ_PUB套接字并将其.connect()-s连接到不同的进程。然后,它立即开始定期发送消息。

如预期的那样,如果没有连接的订户,它将丢弃所有消息,直到订户启动。

该流正常工作,然后,当订户启动时,它会从那一刻开始接收消息。

现在,问题是:

  1. 我断开订户的连接(停止进程)。
  2. 由于目前我只停止了一个订阅者,因此目前没有活动的订阅者。生产者继续发送消息,应该删除该消息,因为不再有连接的订户…
  3. 我重新启动原始订阅者,它绑定,发布者重新连接...订阅者同时接收到所有产生的消息!

因此,我看到的是生产者在订户关闭时将所有消息排队。一旦套接字重新连接,由于订户进程重新启动,它将发送所有排队的消息。

这里我了解到,发布者应该在没有连接的订阅者的情况下丢弃所有已发送的消息:

ZeroMQ示例

“发布者没有连接的订阅者,那么它只会丢弃所有消息。”

为什么会这样呢?

顺便说一下,我正在Linux上使用C ++进行这些测试。

绑定时,我尝试在订户上设置一个不同的身份,但是没有用。发布者仍然排队消息,并在订阅者重新启动时将其全部传递。

提前致谢,

路易斯


更新:

重要更新!!!!
在发布此问题之前,
我尝试了不同的解决方案。一种是设置ZMQ_LINGER为0,这是行不通的。
我添加了ZMQ:IMMEDIATE,它起作用了,但是我发现ZMQ:IMMEDIATE仅此一项是行不通的。它也需要ZMQ_LINGER
Luis Rojas 3小时前

更新: 根据请求,我正在添加一些简单的测试用例来说明我的观点。一个是简单的订户,它在命令行上运行并接收uri绑定的位置,例如:

$ ./sub tcp://127.0.0.1:50001

另一个是发布者,例如,它接收要连接的uri列表:

./pub tcp://127.0.0.1:50001 tcp://127.0.0.1:50002

订户接收多达5条消息,然后关闭套接字并退出。我们可以在wireshark上看到两种方式的FIN / ACK交换,以及套接字如何移动到TIME_WAIT状态。然后,发布者开始发送SYN,尝试重新连接(探测ZMQ_PUB知道连接已关闭)

我明确地没有取消订阅套接字,只是关闭了它。我认为,如果套接字关闭,则发布者应自动终止对该连接的任何订阅。

因此,我看到的是:启动订户(一个或多个),启动发布者,然后开始发送消息。订户收到5条消息并结束。同时,发布者继续发送消息,而没有连接订户。我重新启动订阅服务器,并立即收到几条消息,因为它们在发布者端排队。我认为这些排队的消息打破了发布/订阅模型,在该模型中,消息应仅传递给已连接的订户。如果授权方关闭了连接,则应丢弃发给该订户的消息。更重要的是,当订户重新启动时,它可能会决定订阅其他消息,但仍将接收由绑定在同一端口的“先前的化身”订阅的消息。

我的建议是ZMQ_PUB(处于连接模式)在检测到套接字断开连接时,应清除该套接字上的所有订阅,直到它重新连接并且NEW订阅者决定重新订阅为止。

对于语言错误,我深表歉意,但英语不是我的母语。

酒馆的代码:

#include <stdio.h>
#include <stdlib.h>
#include <libgen.h>
#include <unistd.h>

#include <string>
#include <zeromq/zmq.hpp>

int main( int argc, char *argv[] )
{
    if ( argc < 2 )
    {
        fprintf( stderr, "Usage : %s <remoteUri1> [remoteUri2...]\n",   
        basename( argv[0] ) );
        exit ( EXIT_FAILURE );
    }

    std::string pLocalUri( argv[1] );
    zmq::context_t localContext( 1 );
    zmq::socket_t *pSocket = new zmq::socket_t( localContext, ZMQ_PUB );
    if ( NULL == pSocket )
    {
        fprintf( stderr, "Couldn't create socket. Aborting...\n" );
        exit ( EXIT_FAILURE );
    }

    int i;
    try
    {
        for ( i = 1; i < argc; i++ )
        {
            printf( "Connecting to [%s]\n", argv[i] );
            {
                pSocket->connect( argv[i] );
            }
        }
    }
    catch( ... )
    {
        fprintf( stderr, "Couldn't connect socket to %s. Aborting...\n", argv[i] );
        exit ( EXIT_FAILURE );
    }

    printf( "Publisher Up and running... sending messages\n" );
    fflush(NULL);

    int msgCounter = 0;
    do
    {
        try
        {
            char msgBuffer[1024];
            sprintf( msgBuffer, "Message #%d", msgCounter++ );
            zmq::message_t outTask( msgBuffer, strlen( msgBuffer ) + 1 );
            printf("Sending message [%s]\n", msgBuffer );
            pSocket->send ( outTask );
            sleep( 1 );
        }
        catch( ... )
        {
            fprintf( stderr, "Some unknown error ocurred. Aborting...\n" );
            exit ( EXIT_FAILURE );
        }
    }
    while ( true );

    exit ( EXIT_SUCCESS );
}
Run Code Online (Sandbox Code Playgroud)

子代号

#include <stdio.h>
#include <stdlib.h>
#include <libgen.h>
#include <unistd.h>

#include <string>
#include <zeromq/zmq.hpp>

int main( int argc, char *argv[] )
{
    if ( argc != 2 )
    {
        fprintf( stderr, "Usage : %s <localUri>\n", basename( argv[0] ) );
        exit ( EXIT_FAILURE );
    }

    std::string pLocalUri( argv[1] );
    zmq::context_t localContext( 1 );
    zmq::socket_t *pSocket = new zmq::socket_t( localContext, ZMQ_SUB );
    if ( NULL == pSocket )
    {
        fprintf( stderr, "Couldn't create socket. Aborting...\n" );
        exit ( EXIT_FAILURE );
    }
    try
    {
        pSocket->setsockopt( ZMQ_SUBSCRIBE, "", 0 );
        pSocket->bind( pLocalUri.c_str() );
    }
    catch( ... )
    {
        fprintf( stderr, "Couldn't bind socket. Aborting...\n" );
        exit ( EXIT_FAILURE );
    }

    int msgCounter = 0;
    printf( "Subscriber Up and running... waiting for messages\n" );
    fflush( NULL );

    do
    {
        try
        {
            zmq::message_t inTask;
            pSocket->recv ( &inTask );
            printf( "Message received : [%s]\n", inTask.data() );
            fflush( NULL );
            msgCounter++;
        }
        catch( ... )
        {
            fprintf( stderr, "Some unknown error ocurred. Aborting...\n" );
            exit ( EXIT_FAILURE );
        }
    }
    while ( msgCounter < 5 );

    // pSocket->setsockopt( ZMQ_UNSUBSCRIBE, "", 0 ); NOT UNSUBSCRIBING
    pSocket->close();
    exit ( EXIT_SUCCESS );
}
Run Code Online (Sandbox Code Playgroud)

use*_*197 5

问:为什么会出现这种情况?

因为SUB实际上仍然处于连接状态(还不够“断开连接”)。

是的,可能会令人惊讶,但是杀死进程SUB,无论是在套接字传输媒体的附加端.bind()还是附加端,并不意味着I/O 泵的有限状态机已“移动”到断开状态-状态。.connect()

鉴于此,PUB-side 没有其他选择,只能考虑 -sideSUB仍然存在并连接(即使进程在 -side 的视线之外被默默地杀死PUB),并且对于这种“分布式”状态,有ZeroMQ协议定义的行为(aPUB方职责)收集a(是的,无形中死了)SUB订阅者的所有临时消息,aPUB方仍然认为公平存在(但可能只是在某个较低的地方遇到一些暂时间歇性的问题) ,在传输 I/O 级别或某些类型的远程 CPU 资源匮乏或并发引入的瞬时间歇性 {本地 | 远程}阻塞状态等)。

所以它缓冲...

In case your assassination of the SUB-side agent would appear to have been a bit more graceful ( using the zeroised ZMQ_LINGER + an adequate .close() on the socket-resource instance ) the PUB-side will recognise the "distributed"-system system-wide Finite-State-Automaton shift into an indeed "DISCONNECT"-ed state and a due change-of-behaviour will happen on the PUB-side of the "distributed-FSA", not storing any messages for this "visibly" indeed "DISCONNECT"-ed SUB -- exactly what the documentation states.

"Distributed-FSA" has but quite a weak means to recognise state-change events "beyond it's horizon of localhost contols. KILL-ing a remote process, which implements some remarkable part of the "distributed-FSA" is a devastating event, not a method how to keep the system work. A good option for such external risks might be


Sounds complex?

Oh, yes, it is complex, indeed. That's exactly why ZeroMQ solved this for us, to be free and enjoy designing our application architectures based on top of these ( already solved ) low level complexities.


Distributed-system FSA ( a system-wide FSA of layered composition of sub-FSA-s )

To just imagine what is silently going on under the hood, imagine just having a simple, tandem pair of FSA-FSA - exactly what the pair of .Context() instances try to handle for us in the simplest ever 1:1 PUB/SUB scenario where the use-case KILL-s all the sub-FSA-s on the SUB-side without giving a shot to acknowledge the intention to the PUB-side. Even the TCP-protocol ( living both on the PUB-side and SUB-side ) has several state-transition from [ESTABLISHED] to [CLOSED] state.


A quick X-ray view on a distributed-systems' FSA-of-FSA-s

( just the TCP-protocol FSA was depicted for clarity )

PUB-side:

在此输入图像描述


.socket( .. ) instance's behaviour FSA:

在此输入图像描述


SUB-side:

在此输入图像描述

(Courtesy nanomsg).