agn*_*aft 8 c linux multithreading posix epoll
我正在编写一个小型服务器,它将从多个来源接收数据并处理这些数据.收到的消息来源和数据非常重要,但epoll应该能够很好地处理.但是,必须解析所有接收到的数据并运行大量的测试,这些测试非常耗时,并且尽管进行了epoll多路复用,仍会阻塞单个线程.基本上,模式应该如下所示:IO循环接收数据并将其捆绑到作业中,发送到池中可用的第一个线程,捆绑由作业处理,结果传递到IO循环写入文件.
我决定选择一个IO线程和N个工作线程.使用以下示例提供的用于接受tcp连接和读取数据的IO线程很容易实现:http: //linux.die.net/man/7/epoll
线程通常也很容易处理,但我正在努力将epoll IO循环与线程池以优雅的方式结合起来.我无法找到任何与在线工作池使用epoll的"最佳实践",但有关同一主题的相关问题.
因此,我有一些问题,希望有人能帮我回答:
编辑:一种可能的解决方案是从IO循环更新环形缓冲区,更新后通过所有工作人员的共享管道将环形缓冲区索引发送给工作者(从而将该索引的控制权交给第一个读取该索引的工作人员关闭管道索引),让工人拥有该索引直到处理结束,然后再通过管道将索引号发送回IO线程,从而给予回控制?
我的应用程序仅限Linux,因此我可以使用仅限Linux的功能,以便以最优雅的方式实现这一目标.不需要跨平台支持,但性能和线程安全性是必需的.
在我的测试中,每个线程一个epoll实例的性能远远优于复杂的线程模型。如果将侦听器套接字添加到所有epoll实例,则工作人员将简单地accept(2)获得胜利,并且将向获胜者授予连接并在整个生命周期内对其进行处理。
您的工人可能看起来像这样:
for (;;) {
nfds = epoll_wait(worker->efd, &evs, 1024, -1);
for (i = 0; i < nfds; i++)
((struct socket_context*)evs[i].data.ptr)->handler(
evs[i].data.ptr,
evs[i].events);
}
Run Code Online (Sandbox Code Playgroud)
每个添加到epoll实例的文件描述符都可以struct socket_context与之关联:
void listener_handler(struct socket_context* ctx, int ev)
{
struct socket_context* conn;
conn->fd = accept(ctx->fd, NULL, NULL);
conn->handler = conn_handler;
/* add to calling worker's epoll instance or implement some form
* of load balancing */
}
void conn_handler(struct socket_context* ctx, int ev)
{
/* read all available data and process. if incomplete, stash
* data in ctx and continue next time handler is called */
}
void dummy_handler(struct socket_context* ctx, int ev)
{
/* handle exit condition async by adding a pipe with its
* own handler */
}
Run Code Online (Sandbox Code Playgroud)
我喜欢此策略,因为:
read(2)错误的工人;accept(2));accept(2)。关于epoll的一些注意事项:
EAGAIN;dup(2)家庭通话,以免出现意外(epoll注册文件描述符,但实际上监视文件描述);epoll_ctl(2)安全地其他线程的epoll实例;struct epoll_event缓冲区epoll_wait(2)以避免饥饿。其他注意事项:
accept4(2)保存系统调用;poll(2)/ select(2)如果连接数较少,可能会更快。我希望这有帮助。
在执行这个模型时,因为我们只有在完全接收到数据包后才知道数据包的大小,不幸的是我们无法将接收本身卸载到工作线程。相反,我们仍然可以做的最好的事情是接收数据的线程,该线程必须将指针传递给完全接收的数据包。
数据本身可能最好保存在循环缓冲区中,但是我们希望每个输入源都有一个单独的缓冲区(如果我们得到一个部分数据包,我们可以继续从其他源接收数据而不拆分数据。剩下的问题是如何通知新数据包准备就绪时的工作人员,并给他们一个指向该数据包中数据的指针。因为这里的数据很少,所以只有一些指针,最优雅的方法是使用 posix 消息队列。这些提供了多个发送者和多个接收者写入和读取消息的能力,始终确保每条消息都被准确地接收到 1 个线程。
对于每个数据源,您都需要一个类似于以下结构的结构,我现在将详细介绍字段目的。
struct DataSource
{
int SourceFD;
char DataBuffer[MAX_PACKET_SIZE * (THREAD_COUNT + 1)];
char *LatestPacket;
char *CurrentLocation
int SizeLeft;
};
Run Code Online (Sandbox Code Playgroud)
SourceFD 显然是所讨论数据流的文件描述符,DataBuffer 是在处理数据包时保存数据包内容的地方,它是一个循环缓冲区。最新数据包指针用于临时保存指向最重发数据包的指针,以防我们收到部分数据包并在传递数据包之前移动到另一个源。CurrentLocation 存储最新数据包的结束位置,以便我们知道在哪里放置下一个数据包,或者在部分接收的情况下继续执行。左边的大小是缓冲区中剩余的空间,这将用于判断我们是否可以容纳数据包或需要返回到开头。
因此,接收功能将有效地
工作线程将使用接收到的指针进行处理,然后增加 SizeLeft,以便接收线程知道它可以继续填充缓冲区。将需要原子函数来处理结构中的 size 值,因此我们不会使用 size 属性获得竞争条件(因为它可能由工作线程和 IO 线程同时写入,导致写入丢失,请参阅我的评论),它们列在此处,简单且非常有用。
现在,我已经给出了一些一般背景,但将具体解决给出的要点:
最后,你的编辑是相当明智的,除了我所建议的事实,消息队列比这里的管道好得多,因为它们非常有效地发出事件信号,保证完整的消息读取并提供自动框架。
我希望这会有所帮助,但是现在已经很晚了,所以如果我遗漏了任何内容或者您有任何疑问,请随时发表评论以获得澄清或更多解释。