epoll IO与C中的工作线程

agn*_*aft 8 c linux multithreading posix epoll

我正在编写一个小型服务器,它将从多个来源接收数据并处理这些数据.收到的消息来源和数据非常重要,但epoll应该能够很好地处理.但是,必须解析所有接收到的数据并运行大量的测试,这些测试非常耗时,并且尽管进行了epoll多路复用,仍会阻塞单个线程.基本上,模式应该如下所示:IO循环接收数据并将其捆绑到作业中,发送到池中可用的第一个线程,捆绑由作业处理,结果传递到IO循环写入文件.

我决定选择一个IO线程和N个工作线程.使用以下示例提供的用于接受tcp连接和读取数据的IO线程很容易实现:http: //linux.die.net/man/7/epoll

线程通常也很容易处理,但我正在努力将epoll IO循环与线程池以优雅的方式结合起来.我无法找到任何与在线工作池使用epoll的"最佳实践",但有关同一主题的相关问题.

因此,我有一些问题,希望有人能帮我回答:

  1. 可以(并且应该)使用eventfd作为IO线程和所有工作者之间双向同步的机制吗?例如,每个工作线程是否有一个好主意让自己的epoll例程等待共享的eventfd(带有结构指针,包含有关作业的数据/信息),即以某种方式使用eventfd作为作业队列?也许还有另一个eventfd将结果从多个工作线程传递回IO线程?
  2. 在IO线程上发出关于套接字的更多数据的信号之后,实际的recv应该发生在IO线程上,还是工作者应该自己重新获取数据,以便在解析数据帧时不阻塞IO线程等?在这种情况下,我如何确保安全性,例如,如果recv在工作线程中读取1,5帧数据而另一个工作线程从同一连接接收最后0.5帧数据?
  3. 如果工作线程池是通过互斥锁等实现的,如果N + 1个线程试图使用相同的锁,那么等待锁会阻塞IO线程吗?
  4. 对于如何使用双向通信(即从IO到工作者和返回)在epoll周围构建工作线程池,是否有任何良好的实践模式?

编辑:一种可能的解决方案是从IO循环更新环形缓冲区,更新后通过所有工作人员的共享管道将环形缓冲区索引发送给工作者(从而将该索引的控制权交给第一个读取该索引的工作人员关闭管道索引),让工人拥有该索引直到处理结束,然后再通过管道将索引号发送回IO线程,从而给予回控制?

我的应用程序仅限Linux,因此我可以使用仅限Linux的功能,以便以最优雅的方式实现这一目标.不需要跨平台支持,但性能和线程安全性是必需的.

has*_*ste 6

在我的测试中,每个线程一个epoll实例的性能远远优于复杂的线程模型。如果将侦听器套接字添加到所有epoll实例,则工作人员将简单地accept(2)获得胜利,并且将向获胜者授予连接并在整个生命周期内对其进行处理。

您的工人可能看起来像这样:

for (;;) {
    nfds = epoll_wait(worker->efd, &evs, 1024, -1);

    for (i = 0; i < nfds; i++)
        ((struct socket_context*)evs[i].data.ptr)->handler(
            evs[i].data.ptr,
            evs[i].events);
}
Run Code Online (Sandbox Code Playgroud)

每个添加到epoll实例的文件描述符都可以struct socket_context与之关联:

void listener_handler(struct socket_context* ctx, int ev)
{
    struct socket_context* conn;

    conn->fd = accept(ctx->fd, NULL, NULL);
    conn->handler = conn_handler;

    /* add to calling worker's epoll instance or implement some form
     * of load balancing */
}

void conn_handler(struct socket_context* ctx, int ev)
{
    /* read all available data and process. if incomplete, stash
     * data in ctx and continue next time handler is called */
}

void dummy_handler(struct socket_context* ctx, int ev)
{
    /* handle exit condition async by adding a pipe with its
     * own handler */
}
Run Code Online (Sandbox Code Playgroud)

我喜欢此策略,因为:

  • 非常简单的设计;
  • 所有线程都是相同的;
  • 工人和人际关系是孤立的-不能踩对方的脚趾或召唤read(2)错误的工人;
  • 不需要锁(内核不必担心上的同步accept(2));
  • 由于没有忙碌的工人会积极地争取竞争,因此负载均衡些自然accept(2)

关于epoll的一些注意事项:

  • 使用边缘触发模式,无阻塞套接字,并且始终读取直到EAGAIN;
  • 避免dup(2)家庭通话,以免出现意外(epoll注册文件描述符,但实际上监视文件描述);
  • 您可以epoll_ctl(2)安全地其他线程的epoll实例;
  • 使用较大的struct epoll_event缓冲区epoll_wait(2)以避免饥饿。

其他注意事项:

  • 用于accept4(2)保存系统调用;
  • 每个内核使用一个线程(如果与CPU绑定,则每个物理线程使用1个线程;如果与I / O绑定,则每个逻辑线程使用1个线程);
  • poll(2)/ select(2)如果连接数较少,可能会更快。

我希望这有帮助。

  • 我喜欢这个想法,但是,我担心每次接收后我的繁重工作量会阻塞其他连接。另外,如果一个线程“幸运”地首先选择下一个接受,这是否可能会导致每个线程的工作负载不平衡?此外,如果我只有 4-5 个连接,我可能仍然需要 30 个工作线程来处理它们产生的内容。 (2认同)

Val*_*ity 5

在执行这个模型时,因为我们只有在完全接收到数据包后才知道数据包的大小,不幸的是我们无法将接收本身卸载到工作线程。相反,我们仍然可以做的最好的事情是接收数据的线程,该线程必须将指针传递给完全接收的数据包。

数据本身可能最好保存在循环缓冲区中,但是我们希望每个输入源都有一个单独的缓冲区(如果我们得到一个部分数据包,我们可以继续从其他源接收数据而不拆分数据。剩下的问题是如何通知新数据包准备就绪时的工作人员,并给他们一个指向该数据包中数据的指针。因为这里的数据很少,所以只有一些指针,最优雅的方法是使用 posix 消息队列。这些提供了多个发送者和多个接收者写入和读取消息的能力,始终确保每条消息都被准确地接收到 1 个线程。

对于每个数据源,您都需要一个类似于以下结构的结构,我现在将详细介绍字段目的。

struct DataSource
{
    int SourceFD;
    char DataBuffer[MAX_PACKET_SIZE * (THREAD_COUNT + 1)];
    char *LatestPacket;
    char *CurrentLocation
    int SizeLeft;
};
Run Code Online (Sandbox Code Playgroud)

SourceFD 显然是所讨论数据流的文件描述符,DataBuffer 是在处理数据包时保存数据包内容的地方,它是一个循环缓冲区。最新数据包指针用于临时保存指向最重发数据包的指针,以防我们收到部分数据包并在传递数据包之前移动到另一个源。CurrentLocation 存储最新数据包的结束位置,以便我们知道在哪里放置下一个数据包,或者在部分接收的情况下继续执行。左边的大小是缓冲区中剩余的空间,这将用于判断我们是否可以容纳数据包或需要返回到开头。

因此,接收功能将有效地

  • 将数据包的内容复制到缓冲区中
  • 将 CurrentLocation 移动到数据包的末尾
  • 更新 SizeLeft 以考虑现在减少的缓冲区
  • 如果我们无法将数据包放入缓冲区的末尾,我们就会循环
  • 如果那里没有空间,我们稍后再试一次,同时转到另一个来源
  • 如果我们有一个部分接收存储最新数据包指针指向数据包的开始并转到另一个流,直到我们得到其余的
  • 使用posix 消息队列向工作线程发送一条消息,以便它可以处理数据,该消息将包含一个指向 DataSource 结构的指针,以便它可以对其进行处理,它还需要一个指向它正在处理的数据包的指针,以及它的大小,这些可以在我们收到数据包时计算出来

工作线程将使用接收到的指针进行处理,然后增加 SizeLeft,以便接收线程知道它可以继续填充缓冲区。将需要原子函数来处理结构中的 size 值,因此我们不会使用 size 属性获得竞争条件(因为它可能由工作线程和 IO 线程同时写入,导致写入丢失,请参阅我的评论),它们列在此处,简单且非常有用。

现在,我已经给出了一些一般背景,但将具体解决给出的要点:

  1. 使用 EventFD 作为同步机制在很大程度上是一个坏主意,您会发现自己使用了大量不需要的 CPU 时间,并且很难执行任何同步。特别是如果您有多个线程获取相同的文件描述符,您可能会遇到重大问题。这实际上是一个讨厌的黑客,有时会起作用,但不能真正替代正确的同步。
  2. 如上所述尝试卸载接收也是一个坏主意,您可以通过复杂的 IPC 解决这个问题,但坦率地说,接收 IO 不太可能需要足够的时间来停止您的应用程序,您的 IO 也可能比 CPU 慢得多所以用多线程接收将获得很少的收益。(这个假设你不说,有几个10千兆的网卡)。
  3. 在这里使用互斥锁或锁是一个愚蠢的想法,鉴于(同时)共享数据量很少,它更适合无锁编码,您实际上只是在交接工作和数据。这也将提高接收线程的性能并使您的应用程序更具可扩展性。使用这里提到的函数http://gcc.gnu.org/onlinedocs/gcc-4.1.2/gcc/Atomic-Builtins.html你可以很容易地做到这一点。如果你这样做,你需要的是一个信号量,它可以在每次收到数据包时解锁,并被每个线程锁定远低于带有互斥锁的自制解决方案的开销。
  4. 这里与任何线程池并没有太大区别,您产生了很多线程,然后将它们全部阻塞在数据消息队列上的 mq_receive 中以等待消息。完成后,他们将结果发送回主线程,主线程将结果消息队列添加到其 epoll 列表中。然后它可以通过这种方式接收结果,对于像指针这样的小数据有效载荷来说,这是简单且非常有效的。这也将使用很少的 CPU,并且不会强制主线程浪费时间管理工人。

最后,你的编辑是相当明智的,除了我所建议的事实,消息队列比这里的管道好得多,因为它们非常有效地发出事件信号,保证完整的消息读取并提供自动框架。

我希望这会有所帮助,但是现在已经很晚了,所以如果我遗漏了任何内容或者您有任何疑问,请随时发表评论以获得澄清或更多解释。