为什么特定的UDP消息总是低于特定的缓冲区大小?

Tre*_*key 14 c sockets networking udp vxworks

3个不同的消息以不同的速率发送到同一个端口:

Message  size (bytes)  Sent everytransmit speed
High           232                 10 ms          100Hz                  
Medium     148                 20ms           50Hz                    
Low            20                   60 ms          16.6Hz                 

我每隔约6毫秒只能处理一条消息.
单线程.阻止阅读.


一种奇怪的情况正在发生,我没有解释.
当我将接收缓冲区设置为4,799字节时,我的所有低速消息都会被丢弃.
我看到可能有一两个被处理,然后什么都没有.

当我将接收缓冲区设置为4,800(或更高!)时,似乎所有低速消息都开始被处理.我看到大约每秒16/17.


这一点一直被观察到.发送数据包的应用程序始终在接收应用程序之前启动.接收应用程序在创建套接字之后以及开始处理之前总是有很长的延迟.因此,处理开始时缓冲区始终为满,并且每次测试发生时它都不是相同的起始缓冲区.这是因为套接字是在发送方已经发送消息之后创建的,因此接收方可能会在发送周期的中间开始监听.

为什么增加单个字节的接收缓冲区大小会导致低速消息处理发生巨大变化?

我构建了一个表来更好地可视化预期的处理:
在此输入图像描述

当其中一些消息得到处理时,可能会有更多消息被放入队列而不是被丢弃.

尽管如此,我希望4,799字节缓冲区的行为方式与4,800字节相同.

然而,这不是我所观察到的.


我认为这个问题与低速消息与其他两个消息同时发送的事实有关.它始终在高/中速信息之后接收.(这已经通过wireshark确认).

例如,假设缓冲区开始时为空,很明显低速消息需要比其他消息排队更长的时间.
*每6ms 1条消息大约每30ms发送5条消息. 在此输入图像描述

这仍然不能解释缓冲区大小.

我们正在运行VxWorks,并使用他们的sockLib,它是Berkeley套接字的一个实现.这是我们的套接字创建的代码片段:
SOCKET_BUFFER_SIZE是我正在改变的.

struct sockaddr_in tSocketAddress;                          // Socket address
int     nSocketAddressSize = sizeof(struct sockaddr_in);    // Size of socket address structure
int     nSocketOption = 0;

// Already created
if (*ptParameters->m_pnIDReference != 0)
    return FALSE;

// Create UDP socket
if ((*ptParameters->m_pnIDReference = socket(AF_INET, SOCK_DGRAM, 0)) == ERROR)
{
    // Error
    CreateSocketMessage(ptParameters, "CreateSocket: Socket create failed with error.");

    // Not successful
    return FALSE;
}

// Valid local address
if (ptParameters->m_szLocalIPAddress != SOCKET_ADDRESS_NONE_STRING && ptParameters->m_usLocalPort != 0)
{
    // Set up the local parameters/port
    bzero((char*)&tSocketAddress, nSocketAddressSize);
    tSocketAddress.sin_len = (u_char)nSocketAddressSize;
    tSocketAddress.sin_family = AF_INET;
    tSocketAddress.sin_port = htons(ptParameters->m_usLocalPort);

    // Check for any address
    if (strcmp(ptParameters->m_szLocalIPAddress, SOCKET_ADDRESS_ANY_STRING) == 0)
        tSocketAddress.sin_addr.s_addr = htonl(INADDR_ANY);
    else
    {
        // Convert IP address for binding
        if ((tSocketAddress.sin_addr.s_addr = inet_addr(ptParameters->m_szLocalIPAddress)) == ERROR)
        {
            // Error
            CreateSocketMessage(ptParameters, "Unknown IP address.");

            // Cleanup socket
            close(*ptParameters->m_pnIDReference);
            *ptParameters->m_pnIDReference = ERROR;

            // Not successful
            return FALSE;
        }
    }

    // Bind the socket to the local address
    if (bind(*ptParameters->m_pnIDReference, (struct sockaddr *)&tSocketAddress, nSocketAddressSize) == ERROR)
    {
        // Error
        CreateSocketMessage(ptParameters, "Socket bind failed.");

        // Cleanup socket
        close(*ptParameters->m_pnIDReference);
        *ptParameters->m_pnIDReference = ERROR;

        // Not successful
        return FALSE;
    }
}

// Receive socket
if (ptParameters->m_eType == SOCKTYPE_RECEIVE || ptParameters->m_eType == SOCKTYPE_RECEIVE_AND_TRANSMIT)
{
    // Set the receive buffer size
    nSocketOption = SOCKET_BUFFER_SIZE;
    if (setsockopt(*ptParameters->m_pnIDReference, SOL_SOCKET, SO_RCVBUF, (char *)&nSocketOption, sizeof(nSocketOption)) == ERROR)
    {
        // Error
        CreateSocketMessage(ptParameters, "Socket buffer size set failed.");

        // Cleanup socket
        close(*ptParameters->m_pnIDReference);
        *ptParameters->m_pnIDReference = ERROR;

        // Not successful
        return FALSE;
    }
}
Run Code Online (Sandbox Code Playgroud)

并且套接字接收在无限循环中调用:
*缓冲区大小肯定足够大

int SocketReceive(int nSocketIndex, char *pBuffer, int nBufferLength)
{
    int nBytesReceived = 0;
    char szError[256];

    // Invalid index or socket
    if (nSocketIndex < 0 || nSocketIndex >= SOCKET_COUNT || g_pnSocketIDs[nSocketIndex] == 0)
    {
        sprintf(szError,"SocketReceive: Invalid socket (%d) or ID (%d)", nSocketIndex, g_pnSocketIDs[nSocketIndex]);
        perror(szError);
        return -1;
    }

    // Invalid buffer length
    if (nBufferLength == 0)
    {
        perror("SocketReceive: zero buffer length");
        return 0;
    }

    // Send data
    nBytesReceived = recv(g_pnSocketIDs[nSocketIndex], pBuffer, nBufferLength, 0);

    // Error in receiving
    if (nBytesReceived == ERROR)
    {
        // Create error string
        sprintf(szError, "SocketReceive: Data Receive Failure: <%d> ", errno);

        // Set error message
        perror(szError);

        // Return error
        return ERROR;
    }

    // Bytes received
    return nBytesReceived;
}
Run Code Online (Sandbox Code Playgroud)

有关将缓冲区大小增加到4,800的原因的任何线索都可以成功一致地读取低速消息?

rpy*_*rpy 0

如果不详细分析 UDP 消息发送路径上的每个网络堆栈实现,几乎不可能说明结果行为。

UDP 实现可以自行决定丢弃任何数据包。通常,当堆栈得出需要丢弃数据包才能接收新数据包的结论时,就会发生这种情况。没有正式要求丢弃的数据包是最旧的或最新接收的。也可能是由于内部内存管理策略,某个大小类别受到的影响更大。

从所涉及的 IP 堆栈中,最有趣的是接收机器上的 IP 堆栈。

如果您将接收端更改为接收缓冲区,该缓冲区需要几秒钟的时间才能充满预期消息,那么您肯定会获得更好的接收体验。我至少从10k开始。

从 4,799 到 4,800 时观察到的行为“变化”可能是由于后者仅允许在需要再次丢弃其中一条小消息之前接收它,而较小的大小只会导致它稍微提前丢弃。如果接收应用程序足够快地读取待处理消息,则在一种情况下您将收到小消息,而在另一种情况下则不会收到小消息。