jwb*_*ley 4 c sockets linux networking multithreading
我已经阅读了数据包手册页和一些博客| 试图了解如何使用 PACKET_FANOUT 套接字选项来扩展接收数据的处理的帖子(我希望使用 SOCK_RAW 以高速捕获流量,> 10Gbps)。我已经通读了这个示例代码(复制如下),但我不确定我是否完全理解它。
让我们想象一个场景;网卡上已经设置了RSS,入口流量在RX队列之间均匀分布,有一个8核CPU和8个网卡RX队列,每个RX队列[0-7]分别向CPU[0-7]发送一个中断(关于 MMAP、零拷贝、poll() 等的进一步讨论不在此处讨论)。
这是我在示例代码中看到的事件顺序:
setup_socket()
绑定到同一物理 NIC、promisc 模式和同一 FANOUT 组的所有部分创建一个套接字(我们再次说 0-7)。read()
上的套接字才会在对该套接字进行调用时显示可用数据(因此套接字 0 由线程 0 创建only),然后由于此标志,数据仅复制到该线程的用户空间接收缓冲区。第 4 点是我对这个过程的理解中的主要疑问点。我是否正确理解在这种情况下如何使用 PACKET_FANOUT 进行缩放,以及我们如何将工作线程锁定到处理中断的同一核心?
void start_af_packet_capture(std::string interface_name, int fanout_group_id) {
// setup_socket() calls socket() (using SOCK_RAW) to created the socketFD,
// setsockopt() to enable promisc mode on the NIC,
// bind() to bind the socketFD to NIC,
// and setsockopt() again to set PACKET_FANOUT + PACKET_FANOUT_CPU
int packet_socket = setup_socket(interface_name, fanout_group_id);
if (packet_socket == -1) {
printf("Can't create socket\n");
return;
}
unsigned int capture_length = 1500;
char buffer[capture_length];
while (true) {
received_packets++;
int readed_bytes = read(packet_socket, buffer, capture_length);
// printf("Got %d bytes from interface\n", readed_bytes);
consume_pkt((u_char*)buffer, readed_bytes);
if (readed_bytes < 0) {
break;
}
}
}
...
bool use_multiple_fanout_processes = true;
// Could get some speed up on NUMA servers
bool execute_strict_cpu_affinity = false;
int main() {
boost::thread speed_printer_thread( speed_printer );
int fanout_group_id = getpid() & 0xffff;
if (use_multiple_fanout_processes) {
boost::thread_group packet_receiver_thread_group;
unsigned int num_cpus = 8;
for (int cpu = 0; cpu < num_cpus; cpu++) {
boost::thread::attributes thread_attrs;
if (execute_strict_cpu_affinity) {
cpu_set_t current_cpu_set;
int cpu_to_bind = cpu % num_cpus;
CPU_ZERO(¤t_cpu_set);
// We count cpus from zero
CPU_SET(cpu_to_bind, ¤t_cpu_set);
int set_affinity_result = pthread_attr_setaffinity_np(thread_attrs.native_handle(), sizeof(cpu_set_t), ¤t_cpu_set);
if (set_affinity_result != 0) {
printf("Can't set CPU affinity for thread\n");
}
}
packet_receiver_thread_group.add_thread(
new boost::thread(thread_attrs, boost::bind(start_af_packet_capture, "eth6", fanout_group_id))
);
}
// Wait all processes for finish
packet_receiver_thread_group.join_all();
} else {
start_af_packet_capture("eth6", 0);
}
speed_printer_thread.join();
}
Run Code Online (Sandbox Code Playgroud)
编辑:奖金问题
这可能太无关了,在这种情况下请提供建议,我将开始单独的 SO 帖子。这里的目标不仅是跨多个内核扩展数据包处理,而且还将数据包处理代码放置在接收该数据包的同一个内核上(稍后将探索 MMAP 和 RX_RING),以便减少上下文切换和缓存未命中中央处理器。我的理解是这里正在实现这个目标,有人可以确认或否认吗?
据我所知,不,不完全是。 fanout_demux_cpu
使用 cpu 和扇出组中的套接字数量计算“哈希”,恰好是smp_processor_id() % num
. packet_rcv_fanout
然后将其用作扇出组中套接字数组的索引,以确定哪个套接字获取它。
一旦您看到扇出组的整个设计是基于根据接收到的数据包的属性创建某种散列,而不是基于尝试读取套接字的线程的属性,您可能应该让调度程序解决问题而不是固定线程。
或者,您可以进一步深入研究代码以对数组中套接字的顺序进行逆向工程,但这会很脆弱,您可能需要使用systemtap验证您是否正确执行了此操作。然后,您可以按确定性顺序创建套接字(希望在数组中产生确定性顺序)并将侦听给定套接字的线程固定到适当的 CPU。