Smi*_*tch 15 c++ linux io-uring
我正在开发一个项目,需要在单个 Linux 服务器上以非常高的速度将数据流式传输到磁盘。使用以下命令的fio基准测试表明我应该能够使用 io_uring 获得所需的写入速度(> 40 GB/s)。
\nfio --name=seqwrite --rw=write --direct=1 --ioengine=io_uring --bs=128k --numjobs=4 --size=100G --runtime=300 --directory=/mnt/md0/ --iodepth=128 --buffered=0 --numa_cpu_nodes=0 --sqthread_poll=1\xc2\xa0 --hipri=1\n
Run Code Online (Sandbox Code Playgroud)\n但是,我无法使用自己的代码复制这种性能,因为我的代码使用了io_uring 的liburing帮助程序库。我目前的写入速度约为 9 GB/s。我怀疑 liburing 的额外开销可能是瓶颈,但在放弃更漂亮的 liburing 代码之前,我有几个关于我的方法的问题要问。
\nwritev()
,而是排队请求以使用普通write()
函数写入磁盘。(尝试收集/分散 IO 请求,但这似乎对我的写入速度没有重大影响。)NUM_JOBS
。但是,它没有告诉我内核为 sq 轮询创建的线程。bpftrace -e \'tracepoint:io_uring:io_uring_submit_sqe {printf("%s(%d)\\n", comm, pid);}\'
在一个单独的终端中使用,这表明专用于 sq 轮询的内核线程处于活动状态。IORING_SETUP_ATTACH_WQ
我在设置戒指时尝试使用旗帜。如果说有什么不同的话,那就是这减慢了速度。下面的代码是一个简化版本,为了简洁起见,删除了很多错误处理代码。然而,这个简化版本的性能和功能与全功能代码相同。
\n#include <fcntl.h>\n#include <liburing.h>\n#include <cstring>\n#include <thread>\n#include <vector>\n#include "utilities.h"\n\n#define NUM_JOBS 4 // number of single-ring threads\n#define QUEUE_DEPTH 128 // size of each ring\n#define IO_BLOCK_SIZE 128 * 1024 // write block size\n#define WRITE_SIZE (IO_BLOCK_SIZE * 10000) // Total number of bytes to write\n#define FILENAME "/mnt/md0/test.txt" // File to write to\n\nchar incomingData[WRITE_SIZE]; // Will contain the data to write to disk\n\nint main() \n{\n // Initialize variables\n std::vector<std::thread> threadPool;\n std::vector<io_uring*> ringPool;\n io_uring_params params;\n int fds[2];\n\n int bytesPerThread = WRITE_SIZE / NUM_JOBS;\n int bytesRemaining = WRITE_SIZE % NUM_JOBS;\n int bytesAssigned = 0;\n \n utils::generate_data(incomingData, WRITE_SIZE); // this just fills the incomingData buffer with known data\n\n // Open the file, store its descriptor\n fds[0] = open(FILENAME, O_WRONLY | O_TRUNC | O_CREAT);\n \n // initialize Rings\n ringPool.resize(NUM_JOBS);\n for (int i = 0; i < NUM_JOBS; i++)\n {\n io_uring* ring = new io_uring;\n\n // Configure the io_uring parameters and init the ring\n memset(¶ms, 0, sizeof(params));\n params.flags |= IORING_SETUP_SQPOLL;\n params.sq_thread_idle = 2000;\n io_uring_queue_init_params(QUEUE_DEPTH, ring, ¶ms);\n io_uring_register_files(ring, fds, 1); // required for sq polling\n\n // Add the ring to the pool\n ringPool.at(i) = ring;\n }\n \n // Spin up threads to write to the file\n threadPool.resize(NUM_JOBS);\n for (int i = 0; i < NUM_JOBS; i++)\n {\n int bytesToAssign = (i != NUM_JOBS - 1) ? bytesPerThread : bytesPerThread + bytesRemaining;\n threadPool.at(i) = std::thread(writeToFile, 0, ringPool[i], incomingData + bytesAssigned, bytesToAssign, bytesAssigned);\n bytesAssigned += bytesToAssign;\n }\n\n // Wait for the threads to finish\n for (int i = 0; i < NUM_JOBS; i++)\n {\n threadPool[i].join();\n }\n\n // Cleanup the rings\n for (int i = 0; i < NUM_JOBS; i++)\n {\n io_uring_queue_exit(ringPool[i]);\n }\n\n // Close the file\n close(fds[0]);\n\n return 0;\n}\n
Run Code Online (Sandbox Code Playgroud)\nvoid writeToFile(int fd, io_uring* ring, char* buffer, int size, int fileIndex)\n{\n io_uring_cqe *cqe;\n io_uring_sqe *sqe;\n\n int bytesRemaining = size;\n int bytesToWrite;\n int bytesWritten = 0;\n int writesPending = 0;\n\n while (bytesRemaining || writesPending)\n {\n while(writesPending < QUEUE_DEPTH && bytesRemaining)\n {\n /* In this first inner loop,\n * Write up to QUEUE_DEPTH blocks to the submission queue\n */\n\n bytesToWrite = bytesRemaining > IO_BLOCK_SIZE ? IO_BLOCK_SIZE : bytesRemaining;\n sqe = io_uring_get_sqe(ring);\n if (!sqe) break; // if can\'t get a sqe, break out of the loop and wait for the next round\n io_uring_prep_write(sqe, fd, buffer + bytesWritten, bytesToWrite, fileIndex + bytesWritten);\n sqe->flags |= IOSQE_FIXED_FILE;\n \n writesPending++;\n bytesWritten += bytesToWrite;\n bytesRemaining -= bytesToWrite;\n if (bytesRemaining == 0) break;\n }\n\n io_uring_submit(ring);\n\n while(writesPending)\n {\n /* In this second inner loop,\n * Handle completions\n * Additional error handling removed for brevity\n * The functionality is the same as with errror handling in the case that nothing goes wrong\n */\n\n int status = io_uring_peek_cqe(ring, &cqe);\n if (status == -EAGAIN) break; // if no completions are available, break out of the loop and wait for the next round\n \n io_uring_cqe_seen(ring, cqe);\n\n writesPending--;\n }\n }\n}\n
Run Code Online (Sandbox Code Playgroud)\n
Jen*_*boe 13
您的 fio 示例正在使用 O_DIRECT,您自己的示例正在执行缓冲 IO。这是一个相当大的变化... 除此之外,您还使用 fio 进行轮询 IO,但您的示例不是。轮询 IO 将设置 IORING_SETUP_IOPOLL 并确保底层设备已配置轮询(请参阅 nvme 的 poll_queues=X)。我怀疑您最终还是会使用 fio 进行 IRQ 驱动 IO,以防万一一开始配置不正确。
更多注意事项 - fio 还设置了一些最佳标志,例如延迟任务运行和单一发行者。如果内核足够新,那就会产生影响,尽管对于这个工作负载来说并不是什么疯狂的事情。
最后,您正在使用注册的文件。这显然很好,如果您重用文件描述符,这是一个很好的优化。但这不是 SQPOLL 的要求,它很久以前就消失了。
总之,您正在运行的 fio 作业和您编写的代码执行的操作截然不同。不是同类比较。
编辑:fio 作业也是 4 个线程写入自己的文件,您的示例似乎是 4 个线程写入同一个文件。这显然会让事情变得更糟,特别是因为您的示例是缓冲 IO,并且因此您最终会在 inode 锁上出现大量争用。
归档时间: |
|
查看次数: |
2023 次 |
最近记录: |