高吞吐量非阻塞服务器设计:繁忙等待的替代方案

5 c++ algorithm multithreading producer-consumer busy-waiting

我一直在为多媒体消息构建一个高吞吐量的服务器应用程序,实现语言是C++。每台服务器可以独立使用,也可以将多台服务器连接在一起,创建一个基于DHT的覆盖网络;服务器就像Skype一样的超级对等点。

工作正在进行中。目前,服务器每秒可以处理大约 200,000 条消息(256 字节消息),并且在我的机器(Intel i3 Mobile 2 GHz、Fedora Core 18(64 位)、4 GB RAM)上的最大吞吐量约为 256 MB/s长度为 4096 字节的消息。服务器有两个线程,一个线程用于处理所有 IO(基于 epoll,边缘触发),另一个线程用于处理传入的消息。覆盖管理还有另一个线程,但在当前的讨论中无关紧要。

讨论中的两个线程使用两个循环缓冲区共享数据。线程编号 1 使用一个循环缓冲区为线程编号 2 的新消息入队,而线程编号 2 通过另一个循环缓冲区返回已处理的消息。服务器是完全无锁的。我没有使用任何同步原语,甚至没有使用原子操作。

循环缓冲区永远不会溢出,因为消息是池化的(在启动时预先分配)。事实上,所有重要/经常使用的数据结构都被集中起来以减少内存碎片并提高缓存效率,因此我们知道服务器启动时我们将创建的最大消息数,因此我们可以预先计算最大缓冲区的大小,然后相应地初始化循环缓冲区。

现在我的问题是:线程 #1 将序列化消息一次一条消息(实际上是指向消息对象的指针)排入队列,而线程 #2 以块(32/64/128 的块)从队列中取出消息,然后返回通过第二个循环缓冲区以块的形式处理消息。如果没有新消息,线程#2 会一直忙着等待,因此让 CPU 核心之一一直忙着。我怎样才能进一步改进设计?忙等待策略的替代方案是什么?我想优雅而有效地做到这一点。我考虑过使用信号量,但我担心这不是最好的解决方案,原因很简单,每次我在线程 #1 中排队消息时都必须调用“sem_post”,这可能会大大降低吞吐量,第二个线程必须调用“sem_post”等于防止信号量溢出的次数。另外我担心信号量实现可能在内部使用互斥锁。

第二个不错的选择可能是使用信号,如果我能发现一种算法,仅当第二个线程“清空队列并且正在调用 sigwait”或“已经在等待 sigwait”,简而言之信号必须提高最少次数,尽管如果信号比需要的次数多几次也不会受到伤害。是的,我确实使用过谷歌搜索,但我在互联网上找到的解决方案都不是令人满意的。以下是一些注意事项:

A. 服务器在进行系统调用时必须浪费最少的 CPU 周期,并且必须使用最少的系统调用次数。

B. 必须有非常低的开销并且算法必须是高效的。

C. 没有任何锁定。

我希望所有选项都摆在桌面上。

这是我共享服务器信息的站点的链接,以便更好地了解其功能和目的:www.wanhive.com

Evg*_*zin 2

如果您需要尽快唤醒线程#2,那么繁忙等待是很好的选择。事实上,这是向一个处理器通知另一个处理器所做的更改的最快方法。您需要在两端生成内存栅栏(一侧写栅栏,另一侧读栅栏)。但只有当两个线程都在专用处理器上执行时,此声明才成立。在这种情况下,不需要上下文切换,只需缓存一致性流量。

有一些可以改进的地方。

  1. 如果线程 #2 通常受 CPU 限制并且忙于等待 - 它可能会受到调度程序的惩罚(至少在 Windows 和 Linux 上)。操作系统调度程序动态调整线程优先级以提高系统整体性能。它降低了消耗大量 CPU 时间的 CPU 绑定线程的优先级,以防止线程饥饿。您需要手动提高线程 #2 的优先级来防止这种情况发生。
  2. 如果您拥有多核或多处理器计算机,您最终会遇到处理器订阅不足的情况,并且您的应用程序将无法利用硬件并发性。您可以通过使用多个处理器线程(线程#2)来缓解这种情况。

处理步骤的并行化。 有两种选择。

  1. 您的消息是完全有序的,需要按照消息到达时的顺序进行处理。
  2. 消息可以重新排序。处理可以按任何顺序进行。

在第一种情况下,您需要 N 个循环缓冲区、N 个处理线程、N 个输出缓冲区和一个消费者线程。线程 #1 在该循环缓冲区中按循环顺序将消息排入队列。

// Thread #1 pseudocode
auto message = recv()
auto buffer_index = atomic_increment(&message_counter);
buffer_index = buffer_index % N;  // N is the number of threads
// buffers is an array of cyclic buffers - Buffer* buffers[N];
Buffer* current_buffer = buffers[buffer_index];
current_buffer->euqueue(message);
Run Code Online (Sandbox Code Playgroud)

每个线程使用来自缓冲区之一的消息并将结果排队到其专用输出缓冲区。

// Thread #i pseudocode
auto message = my_buffer->dequeue();
auto result = process(message);
my_output_buffer->enqueue(result);
Run Code Online (Sandbox Code Playgroud)

现在您需要按到达顺序处理所有这些消息。您可以使用专用的使用者线程通过以循环方式将已处理的消息从输出循环缓冲区中出队来完成此操作。

// Consumer thread pseudocode
// out_message_counter is equal to message_counter at start
auto out_buffer_index = atomic_increment(&out_message_counter);
out_buffer_index = out_buffer_index % N;
// out_buffers is array of output buffers that is used by processing
// threads
auto out_buffer = out_buffers[out_buffer_index];
auto result = out_buffer->dequeue();
send(result);  // or whatever you need to do with result
Run Code Online (Sandbox Code Playgroud)

在第二种情况下,当您不需要保留消息顺序时 - 您不需要消费者线程和输出循环缓冲区。您只需在处理线程中对结果执行任何需要执行的操作即可。

N 必须等于num CPU's- 在第一种情况下为 3(“- 3”是一个 I/O 线程 + 一个消费者线程 + 一个 DHT 线程),num CPU's在第二种情况下 - 2(“- 2”是一个 I/O 线程 + 一个 DHT 线程) )。这是因为如果处理器超额订阅,则繁忙等待不会有效。