日志文件的并行GZip解压缩 - 为最高吞吐量调整MaxDegreeOfParallelism

mil*_*iCH 14 c# parallel-processing performance multithreading c#-4.0

我们每天最多有30 GB的GZip压缩日志文件.每个文件保存100.000行,压缩后为6到8 MB.解析逻辑已被剥离的简化代码利用Parallel.ForEach循环.

在两个NUMA节点上,MaxDegreeOfParallelism为8处理的最大行数达到峰值,32个逻辑CPU盒(Intel Xeon E7-2820 @ 2 GHz):

using System;

using System.Collections.Concurrent;

using System.Linq;
using System.IO;
using System.IO.Compression;

using System.Threading.Tasks;

namespace ParallelLineCount
{
    public class ScriptMain
    {
        static void Main(String[] args)
        {
            int    maxMaxDOP      = (args.Length > 0) ? Convert.ToInt16(args[0]) : 2;
            string fileLocation   = (args.Length > 1) ? args[1] : "C:\\Temp\\SomeFiles" ;
            string filePattern    = (args.Length > 1) ? args[2] : "*2012-10-30.*.gz";
            string fileNamePrefix = (args.Length > 1) ? args[3] : "LineCounts";

            Console.WriteLine("Start:                 {0}", DateTime.UtcNow.ToString("yyyy-MM-ddTHH:mm:ss.fffffffZ"));
            Console.WriteLine("Processing file(s):    {0}", filePattern);
            Console.WriteLine("Max MaxDOP to be used: {0}", maxMaxDOP.ToString());
            Console.WriteLine("");

            Console.WriteLine("MaxDOP,FilesProcessed,ProcessingTime[ms],BytesProcessed,LinesRead,SomeBookLines,LinesPer[ms],BytesPer[ms]");

            for (int maxDOP = 1; maxDOP <= maxMaxDOP; maxDOP++)
            {

                // Construct ConcurrentStacks for resulting strings and counters
                ConcurrentStack<Int64> TotalLines = new ConcurrentStack<Int64>();
                ConcurrentStack<Int64> TotalSomeBookLines = new ConcurrentStack<Int64>();
                ConcurrentStack<Int64> TotalLength = new ConcurrentStack<Int64>();
                ConcurrentStack<int>   TotalFiles = new ConcurrentStack<int>();

                DateTime FullStartTime = DateTime.Now;

                string[] files = System.IO.Directory.GetFiles(fileLocation, filePattern);

                var options = new ParallelOptions() { MaxDegreeOfParallelism = maxDOP };

                //  Method signature: Parallel.ForEach(IEnumerable<TSource> source, Action<TSource> body)
                Parallel.ForEach(files, options, currentFile =>
                    {
                        string filename = System.IO.Path.GetFileName(currentFile);
                        DateTime fileStartTime = DateTime.Now;

                        using (FileStream inFile = File.Open(fileLocation + "\\" + filename, FileMode.Open))
                        {
                            Int64 lines = 0, someBookLines = 0, length = 0;
                            String line = "";

                            using (var reader = new StreamReader(new GZipStream(inFile, CompressionMode.Decompress)))
                            {
                                while (!reader.EndOfStream)
                                {
                                    line = reader.ReadLine();
                                    lines++; // total lines
                                    length += line.Length;  // total line length

                                    if (line.Contains("book")) someBookLines++; // some special lines that need to be parsed later
                                }

                                TotalLines.Push(lines); TotalSomeBookLines.Push(someBookLines); TotalLength.Push(length);
                                TotalFiles.Push(1); // silly way to count processed files :)
                            }
                        }
                    }
                );

                TimeSpan runningTime = DateTime.Now - FullStartTime;

                // Console.WriteLine("MaxDOP,FilesProcessed,ProcessingTime[ms],BytesProcessed,LinesRead,SomeBookLines,LinesPer[ms],BytesPer[ms]");
                Console.WriteLine("{0},{1},{2},{3},{4},{5},{6},{7}",
                    maxDOP.ToString(),
                    TotalFiles.Sum().ToString(),
                    Convert.ToInt32(runningTime.TotalMilliseconds).ToString(),
                    TotalLength.Sum().ToString(),
                    TotalLines.Sum(),
                    TotalSomeBookLines.Sum().ToString(),
                    Convert.ToInt64(TotalLines.Sum() / runningTime.TotalMilliseconds).ToString(),
                    Convert.ToInt64(TotalLength.Sum() / runningTime.TotalMilliseconds).ToString());

            }
            Console.WriteLine();
            Console.WriteLine("Finish:                " + DateTime.UtcNow.ToString("yyyy-MM-ddTHH:mm:ss.fffffffZ"));
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

以下是结果摘要,MaxDegreeOfParallelism = 8处有明显的峰值:

在此输入图像描述

CPU负载(此处汇总显示,大部分负载都在单个NUMA节点上,即使DOP在20到30范围内):

在此输入图像描述

我发现使CPU负载超过95%标记的唯一方法是将文件分成4个不同的文件夹并执行相同的命令4次,每个命令都针对所有文件的子集.

有人能找到瓶颈吗?

Jim*_*hel 8

一个问题可能是默认FileStream构造函数使用的小缓冲区大小.我建议你使用更大的输入缓冲区.如:

using (FileStream infile = new FileStream(
    name, FileMode.Open, FileAccess.Read, FileShare.None, 65536))
Run Code Online (Sandbox Code Playgroud)

默认缓冲区大小为4千字节,其中有多个线程对I/O子系统进行多次调用以填充其缓冲区.64K的缓冲区意味着您可以更频繁地进行这些调用.

我发现32K到256K之间的缓冲区大小可以提供最佳性能,当我一段时间做一些详细的测试时,64K是"最佳点".大于256K的缓冲区实际上开始降低性能.

此外,虽然这不太可能对性能产生重大影响,但您可能应该ConcurrentStack使用64位整数替换这些实例并使用Interlocked.AddInterlocked.Increment更新它们.它简化了代码,无需管理集合.

更新:

重新阅读你的问题描述,我对此声明感到震惊:

我发现使CPU负载超过95%标记的唯一方法是将文件分成4个不同的文件夹并执行相同的命令4次,每个命令都针对所有文件的子集.

对我来说,这指向打开文件的瓶颈.好像操作系统在目录上使用互斥锁.即使所有数据都在缓存中并且不需要物理I/O,进程仍然需要等待此锁定.文件系统也可能正在写入磁盘.请记住,它必须在文件打开时更新文件的上次访问时间.

如果I/O确实是瓶颈,那么你可能会考虑让一个单独的线程除了加载文件并将它们填充到一个BlockingCollection或类似的数据结构中之外什么都不做,这样处理线程就不必相互竞争以获得锁定目录.您的应用程序成为具有一个生产者和N个消费者的生产者/消费者应