首先,一些上下文来解释我为什么选择"UDP采样"路径:
我想在一段未知的时间内快速生成数据.我想要采样的数据是在另一台机器上,而不是消耗数据的机器.我在两者之间有专用的以太网连接,因此带宽不是问题.我遇到的问题是消耗数据的机器比生成数据的机器慢得多.一个附加的约束是,虽然我没有得到所有样本(它们只是样本),但我必须得到最后一个样本.
我的第一个解决方案是让数据生成器为每个生成的样本发送一个UDP数据报,让数据使用者尝试获取它可能的样本,并在UDP套接字已满时让其他人被套接字层丢弃.这个解决方案的问题是,当新的UDP数据报到达并且套接字已满时,新数据报将被丢弃而不是旧数据报.因此,我没有保证拥有最后一个!
我的问题是:有没有办法让UDP套接字在新到达时替换旧的数据报?
接收器目前是一台Linux机器,但是未来可能会改变另一种类似unix的操作系统(Windows可能会实现BSD套接字,但不太可能)
理想的解决方案会使用广泛的机制(如setsockopt()) s)工作.
PS:我想到了其他解决方案,但它们更复杂(涉及发件人的大量修改),因此我想首先对我要求的可行性有一个明确的地位!:)
更新:
- 我知道接收机器上的操作系统可以处理网络负载+重新组装发送方生成的流量.它的默认行为是在套接字缓冲区已满时丢弃新的数据报.而且由于接收过程中的处理时间,我知道无论我做什么它都会变满(浪费一半的内存在套接字缓冲区上不是一个选项:)).
- 我真的希望避免让一个辅助进程执行操作系统在数据包分派时可能完成的操作,并浪费资源只是在SHM中复制消息.
- 我看到修改发件人的问题是我可以访问的代码只是一个PleaseSendThisData()函数,它不知道它可能是最后一次在很长一段时间之前被调用,所以我不知道看到任何可行的伎俩...但我愿意接受建议!:)
如果真的没有办法改变BSD套接字中的UDP接收行为,那么......告诉我,我准备接受这个可怕的事实,并且当我回到时,我将开始研究"帮助程序"解决方案它:)
我同意“咖啡馆”。将套接字设置为非阻塞模式。
每当您在套接字上收到一些东西时 - 循环读取,直到不再剩下任何东西。然后处理最后读取的数据报。
只有一个注意事项:您应该为套接字设置一个大的系统接收缓冲区
int nRcvBufSize = 5*1024*1024; // or whatever you think is ok
setsockopt(sock, SOL_SOCKET, SO_RCVBUF, (char*) &nRcvBufSize, sizeof(nRcvBufSize));
Run Code Online (Sandbox Code Playgroud)
仅在侦听器端很难完全正确,因为它实际上可能会错过网络接口芯片中的最后一个数据包,这将使您的程序没有机会看到它。
操作系统的 UDP 代码将是尝试处理此问题的最佳位置,因为即使它决定丢弃新数据包,它也会收到新数据包,因为它已经有太多排队的数据包。然后它可以决定丢弃旧的或丢弃新的,但我不知道如何告诉它这就是你想要它做的事情。
您可以尝试在接收器上处理此问题,方法是让一个程序或线程始终尝试读取最新的数据包,而另一个程序或线程始终尝试获取最新的数据包。根据您是作为两个单独的程序还是作为两个线程执行此操作,如何执行此操作会有所不同。
作为线程,您需要一个互斥锁(信号量或类似的东西)来保护指向用于保存 1 UDP 有效负载的结构的指针(或引用)以及您想要的任何其他内容(大小、发送者 IP、发送者端口、时间戳等) )。
实际从套接字读取数据包的线程会将数据包的数据存储在结构中,获取保护该指针的互斥体,将当前指针替换为指向刚刚创建的结构体的指针,释放互斥体,向处理器线程发出信号有事情要做,然后清除它刚刚获得指针的结构,并用它来保存下一个进入的数据包。
实际处理数据包有效负载的线程应该等待来自其他线程的信号和/或定期(500 毫秒左右可能是一个很好的起点,但您决定)并获取互斥体,将其指针交换到 UDP 有效负载结构与现有的结构,释放互斥体,然后如果结构有任何数据包数据,它应该处理它,然后等待下一个信号。如果它没有任何数据,它应该继续等待下一个信号。
处理器线程可能应该以低于 UDP 侦听器的优先级运行,以便侦听器不太可能错过数据包。当处理最后一个数据包(您真正关心的数据包)时,处理器不会被中断,因为没有新的数据包可供侦听器听到。
您可以通过使用队列而不仅仅是单个指针作为两个线程的交换位置来扩展此功能。单指针只是一个长度为1的队列,非常容易处理。
您还可以通过尝试让侦听器线程检测是否有多个数据包在等待来扩展此功能,并且仅将最后一个数据包实际放入处理器线程的队列中。执行此操作的方式因平台而异,但如果您使用的是 *nix,那么对于没有任何等待的套接字,这应该返回 0:
while (keep_doing_this()) {
ssize_t len = read(udp_socket_fd, my_udp_packet->buf, my_udp_packet->buf_len);
// this could have been recv or recvfrom
if (len < 0) {
error();
}
int sz;
int rc = ioctl(udp_socket_fd, FIONREAD, &sz);
if (rc < 0) {
error();
}
if (!sz) {
// There aren't any more packets ready, so queue up the one we got
my_udp_packet->current_len = len;
my_udp_packet = swap_udp_packet(my_ucp_packet);
/* swap_udp_packet is code you would have to write to implement what I talked
about above. */
tgkill(this_group, procesor_thread_tid, SIGUSR1);
} else if (sz > my_udp_packet->buf_len) {
/* You could resize the buffer for the packet payload here if it is too small.*/
}
}
Run Code Online (Sandbox Code Playgroud)
必须为每个线程分配一个 udp_packet,并为交换指针分配 1 个。如果您使用队列进行交换,那么您必须为队列中的每个位置提供足够的 udp_packets ——因为指针只是一个长度为 1 的队列,因此它只需要 1。
如果您使用的是 POSIX 系统,请考虑不要使用实时信号来发送信号,因为它们会排队。使用常规信号将允许您将多次收到信号视为与仅收到一次信号相同,直到信号被处理,而实时信号会排队。定期醒来检查队列还可以让您处理在检查是否有任何新数据包之后但在调用pause等待信号之前到达最后一个信号的可能性。
| 归档时间: |
|
| 查看次数: |
2542 次 |
| 最近记录: |