如何依靠I/O大量正确地并行工作

Ste*_*e B 22 c# parallel-processing multithreading plinq task-parallel-library

我正在构建一个必须处理大量数据的控制台应用程序.

基本上,应用程序从数据库中获取引用.对于每个引用,解析文件的内容并进行一些更改.这些文件是HTML文件,并且该过程正在使用RegEx替换进行繁重的工作(查找引用并将它们转换为链接).然后将结果存储在文件系统中并发送到外部系统.

如果我按顺序恢复该过程:

var refs = GetReferencesFromDB(); // ~5000 Datarow returned
foreach(var ref in refs)
{
    var filePath = GetFilePath(ref); // This method looks up in a previously loaded file list
    var html = File.ReadAllText(filePath); // Read html locally, or from a network drive
    var convertedHtml = ParseHtml(html);
    File.WriteAllText(destinationFilePath); // Copy the result locally, or a network drive
    SendToWs(ref, convertedHtml);
}
Run Code Online (Sandbox Code Playgroud)

我的程序工作正常,但速度很慢.这就是为什么我想要并行化这个过程.

到现在为止,我做了一个简单的并行化添加AsParallel:

var refs = GetReferencesFromDB().AsParallel(); 
refs.ForAll(ref=>
{
    var filePath = GetFilePath(ref); 
    var html = File.ReadAllText(filePath); 
    var convertedHtml = ParseHtml(html);
    File.WriteAllText(destinationFilePath); 
    SendToWs(ref, convertedHtml);
});
Run Code Online (Sandbox Code Playgroud)

这种简单的改变减少了过程的持续时间(减少了25%的时间).但是,我对并行化的理解是,如果依赖于I/O的资源并行,那么将没有太多的好处(或者更糟的是,更少的好处),因为i/o不会神奇地加倍.

这就是为什么我认为我应该改变我的方法而不是并行化整个过程,而是创建依赖的链式排队任务.

IE,我应该创建一个流程:

队列读取文件.完成后,Queue ParseHtml.完成后,Queue都会发送到WS并在本地写入.完成后,记录结果.

但是,我不知道如何实现这样的思考.

我觉得它会以一组消费者/生产者队列结束,但我找不到正确的样本.

而且,我不确定是否会有好处.

谢谢你的建议

[编辑]事实上,我是使用c#4.5的完美候选人...如果只是rtm :)

[编辑2]让我觉得它没有正确并行化的另一件事是,在资源监视器中,我看到CPU,网络I/O和磁盘I/O的图表不稳定.当一个人高,其他人是低到中等

Dre*_*rsh 17

您没有在任何代码中利用任何异步I/O API.你所做的一切都是CPU限制的,所有的I/O操作都会浪费CPU资源阻塞.AsParallel对于计算绑定任务,如果要利用异步I/O,则需要在<= v4.0中利用基于异步编程模型(APM)的API.这是通过在BeginXXX/EndXXX您正在使用的基于I/O的类上查找方法并在可用时利用这些方法来完成的.

阅读这篇文章的初学者:TPL TaskFactory.FromAsync vs Tasks with blocking方法

接下来,AsParallel无论如何你都不想在这种情况下使用.AsParallel启用流式传输,这将导致每个项目立即安排一个新任务,但您不需要/想要这里.使用分区工作可以更好地完成工作Parallel::ForEach.

让我们看看如何使用这些知识在特定情况下实现最大并发性:

var refs = GetReferencesFromDB();

// Using Parallel::ForEach here will partition and process your data on separate worker threads
Parallel.ForEach(
    refs,
    ref =>
{ 
    string filePath = GetFilePath(ref);

    byte[] fileDataBuffer = new byte[1048576];

    // Need to use FileStream API directly so we can enable async I/O
    FileStream sourceFileStream = new FileStream(
                                      filePath, 
                                      FileMode.Open,
                                      FileAccess.Read,
                                      FileShare.Read,
                                      8192,
                                      true);

    // Use FromAsync to read the data from the file
    Task<int> readSourceFileStreamTask = Task.Factory.FromAsync(
                                             sourceFileStream.BeginRead
                                             sourceFileStream.EndRead
                                             fileDataBuffer,
                                             fileDataBuffer.Length,
                                             null);

    // Add a continuation that will fire when the async read is completed
    readSourceFileStreamTask.ContinueWith(readSourceFileStreamAntecedent =>
    {
        int soureFileStreamBytesRead;

        try
        {
            // Determine exactly how many bytes were read 
            // NOTE: this will propagate any potential exception that may have occurred in EndRead
            sourceFileStreamBytesRead = readSourceFileStreamAntecedent.Result;
        }
        finally
        {
            // Always clean up the source stream
            sourceFileStream.Close();
            sourceFileStream = null;
        }

        // This is here to make sure you don't end up trying to read files larger than this sample code can handle
        if(sourceFileStreamBytesRead == fileDataBuffer.Length)
        {
            throw new NotSupportedException("You need to implement reading files larger than 1MB. :P");
        }

        // Convert the file data to a string
        string html = Encoding.UTF8.GetString(fileDataBuffer, 0, sourceFileStreamBytesRead);

        // Parse the HTML
        string convertedHtml = ParseHtml(html);

        // This is here to make sure you don't end up trying to write files larger than this sample code can handle
        if(Encoding.UTF8.GetByteCount > fileDataBuffer.Length)
        {
            throw new NotSupportedException("You need to implement writing files larger than 1MB. :P");
        }

        // Convert the file data back to bytes for writing
        Encoding.UTF8.GetBytes(convertedHtml, 0, convertedHtml.Length, fileDataBuffer, 0);

        // Need to use FileStream API directly so we can enable async I/O
        FileStream destinationFileStream = new FileStream(
                                               destinationFilePath,
                                               FileMode.OpenOrCreate,
                                               FileAccess.Write,
                                               FileShare.None,
                                               8192,
                                               true);

        // Use FromAsync to read the data from the file
        Task destinationFileStreamWriteTask = Task.Factory.FromAsync(
                                                  destinationFileStream.BeginWrite,
                                                  destinationFileStream.EndWrite,
                                                  fileDataBuffer,
                                                  0,
                                                  fileDataBuffer.Length,
                                                  null);

        // Add a continuation that will fire when the async write is completed
        destinationFileStreamWriteTask.ContinueWith(destinationFileStreamWriteAntecedent =>
        {
            try
            {
                // NOTE: we call wait here to observe any potential exceptions that might have occurred in EndWrite
                destinationFileStreamWriteAntecedent.Wait();
            }
            finally
            {
                // Always close the destination file stream
                destinationFileStream.Close();
                destinationFileStream = null;
            }
        },
        TaskContinuationOptions.AttachedToParent);

        // Send to external system **concurrent** to writing to destination file system above
        SendToWs(ref, convertedHtml);
    },
    TaskContinuationOptions.AttachedToParent);
});
Run Code Online (Sandbox Code Playgroud)

现在,这里有几点说明:

  1. 这是示例代码,所以我使用1MB缓冲区来读/写文件.这对HTML文件来说过多,浪费了系统资源.你可以降低它以满足你的最大需求,或者实现链式读/写到StringBuilder,这是一个我留给你的练习,因为我要编写~500多行代码来做异步链接读/写.:P
  2. 你会注意到我在读/写任务的延续TaskContinuationOptions.AttachedToParent.这非常重要,因为它将阻止Parallel::ForEach启动工作的工作线程完成,直到所有底层异步调用完成.如果不在这里,您将同时启动所有5000个项目的工作,这会使TPL子系统污染数千个计划任务并且根本无法正确扩展.
  3. 我调用SendToWs并将文件写入文件共享.我不知道SendToWs的实现有什么潜在的基础,但它听起来像是一个很好的候选者来制作异步.现在它假设它是纯粹的计算工作,因此,它将在执行时刻录CPU线程.我把它作为一个练习让你弄清楚如何最好地利用我所展示的那些来提高那里的吞吐量.
  4. 这是所有类型的自由形式,我的大脑是这里唯一的编译器和SO的语法高亮是我用来确保语法是好的.所以,请原谅任何语法错误,让我知道如果我搞得太糟糕,你无法做出正面或反面,我会跟进.

  • 我们正在进入一个领域,原始问题中没有足够的信息能够提供您正在寻找的绝对最佳方法.提供的代码将受限于Parallel :: ForEach将同时执行如此多的工作任务(基于maxdegreeofparallelism和启发式)并且因为I/O工作流程步骤被链接到那些,这也将限制数量I/O操作也在进行,同时在那些时候(关键)仍然释放CPU资源.即使您使用p/c模式,您仍然希望使用异步I/O来最大化吞吐量. (2认同)

Bri*_*eon 5

好消息是您的逻辑可以很容易地分成进入生产者 - 消费者管道的步骤.

  • 第1步:读取文件
  • 第2步:解析文件
  • 第3步:写入文件
  • 第4步:SendToWs

如果您使用的是.NET 4.0,则可以使用BlockingCollection数据结构作为每个步骤的生产者 - 使用者队列的主干.主线程将每个工作项排入第1步的队列,在那里它将被拾取和处理,然后转发到第2步的队列,依此类推.

如果您愿意继续使用Async CTP,那么您也可以利用新的TPL Dataflow结构.还有就是BufferBlock<T>数据结构等等,其行为以类似的方式BlockingCollection,并与新的集成以及asyncawait关键字.

由于您的算法是IO绑定的,因此生产者 - 消费者策略可能无法为您提供所需的性能提升,但至少您将拥有一个非常优雅的解决方案,如果您可以提高IO吞吐量,该解决方案可以很好地扩展.我担心步骤1和3将成为瓶颈,管道将无法很好地平衡,但值得尝试.