数百个消费者和大文件的好方法

shd*_*hda 9 .net c# multithreading task-parallel-library tpl-dataflow

我有几个文件(每个近1GB)和数据.数据是一个字符串行.

我需要与数百名消费者一起处理这些文件.这些消费者中的每一个都做一些与其他消费者不同的处 消费者不会同时写任何地方.他们只需要输入字符串.处理后,他们更新本地缓冲区.消费者可以轻松地并行执行.

重要提示:对于一个特定文件,每个消费者必须以正确的顺序处理所有行(不跳过)(因为它们出现在文件中).处理不同文件的顺序无关紧要.

一个消费者对单行的处理速度相当快.我希望Corei5上的时间不到50微秒.

所以现在我正在寻找解决这个问题的好方法.这将成为.NET项目的一部分,所以请让我们坚持使用.NET(最好是C#).

我知道TPL和DataFlow.我猜最相关的是BroadcastBlock.但我认为这里的问题是,每一行我都要等待所有消费者完成才能发布新的消费者.我猜它效率不高.

我认为理想的情况是这样的:

  1. 一个线程从文件读取并写入缓冲区.
  2. 每个消费者在准备就绪时,同时从缓冲区中读取该行并对其进行处理.
  3. 当一个消费者读取缓冲区时,不应删除缓冲区中的条目.只有当所有消费者都已处理它时,才能将其删除.
  4. TPL自己安排消费者线程.
  5. 如果一个消费者胜过其他消费者,它不应该等待,并且可以从缓冲区中读取更多最近的条目.

我采用这种方法是对的吗?无论是否,我如何实施良好的解决方案?

pap*_*zzo 1

我不同意一个线程从文件读取并写入缓冲区
在多个 1 GB 的文件中,该线程会消耗太多内存
。NET 有对象大小限制,集合是一个对象

你需要限制读取行数
我认为你可以使用 BlockingCollection 来做到这一点 bc 的 1000000 处理让最慢的消费者保持忙碌
并且它还为打开下一个文件提供了一些缓冲区

using System.IO;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace BlockingCollection2
{
    /// <summary>
    /// Interaction logic for MainWindow.xaml
    /// </summary>
    public partial class MainWindow : Window
    {
        public MainWindow()
        {
            InitializeComponent();
        }
        public static void BC_GetConsumingEnumerableCollection()
        {
            List<string> fileNames = new List<string>();  // add filesNames
            string producerLine;
            System.IO.StreamReader file;
            List<BCtaskBC> bcs = new List<BCtaskBC>();  // add for each consumer
            // Kick off a producer task
            Task.Factory.StartNew(() =>
            {
                foreach(string fileName in fileNames)
                {
                    file = new System.IO.StreamReader(fileName);
                    while ((producerLine = file.ReadLine()) != null)
                    {
                        foreach (BCtaskBC bc in bcs)
                        {
                            // string is reference type but it often acts like a value type
                            // may need to make a deep copy of producerLine for this next line
                            bc.BC.Add(producerLine);  // if  any queue size gets to 1000000 then this blocks
                        }
                    }
                    file.Close();
                }                 
                // Need to do this to keep foreach below from hanging
                foreach (BCtaskBC bc in bcs)
                {
                    bc.BC.CompleteAdding();
                }
            });

            // Now consume the blocking collection with foreach. 
            // Use bc.GetConsumingEnumerable() instead of just bc because the 
            // former will block waiting for completion and the latter will 
            // simply take a snapshot of the current state of the underlying collection. 
            //  Method signature: Parallel.ForEach(IEnumerable<TSource> source, Action<TSource> body)
            Parallel.ForEach(bcs, bc =>
            {
                foreach (string consumerLine in bc.BC.GetConsumingEnumerable())
                {
                    bc.BCtask.ProcessTask(consumerLine);  
                }
            } //close lambda expression
                 ); //close method invocation 
            // I think this need to be parallel
            //foreach (BCtaskBC bc in bcs)
            //{
            //    foreach (string consumerLine in bc.BC.GetConsumingEnumerable())
            //    {
            //        bc.BCtask.ProcessTask(consumerLine);
            //    }
            //}
        }
        public abstract class BCtaskBC
        {   // may need to do something to make this thread safe   
            private BlockingCollection<string> bc = new BlockingCollection<string>(1000000);  // this trotttles the size
            public BCtask BCtask { get; set; }
            public BlockingCollection<string> BC { get { return bc; } }
        }
        public abstract class BCtask
        {   // may need to do something to make this thread safe 
            public void ProcessTask(string S) {}
        }
    }
}
Run Code Online (Sandbox Code Playgroud)