读多线程的大txt文件?

obd*_*dgy 22 c#

我有100000行的大型txt文件.我需要启动n-count个线程并从该文件中为每个线程提供uniq行.做这个的最好方式是什么?我想我需要逐行读取文件,迭代器必须是全局的才能锁定它.将txt文件加载到列表将非常耗时,我可以收到OutofMemory异常.有任何想法吗?用一些代码帮助plz.

dtb*_*dtb 37

您可以使用File.ReadLines方法逐行读取文件而不立即将整个文件加载到内存中,并使用Parallel.ForEach方法并行处理多个线程中的行:

Parallel.ForEach(File.ReadLines("file.txt"), (line, _, lineNumber) =>
{
    // your code here
});
Run Code Online (Sandbox Code Playgroud)

  • https://dotnetfiddle.net/wX7VhA 可能对 @SteffenWinkler 感兴趣。请注意,第 3 项在第 1 项结束后开始 - 而不是在第 2 项结束后。我不相信你的成堆担忧是有效的。 (2认同)
  • @mjwills 嗯,经过一些进一步的尝试/测试后,我不得不同意你的看法。我最初的观察一定是巧合,或者我没有对发生的事情给予足够的关注。但是,我要注意的一件事是 Parallel.Foreach 似乎将条目列表划分为可用线程的数量,并且每个线程执行一个子列表。因此,线程 1 获取条目 1 - 20,线程 2 获取条目 21 - 40,而不是仅获取下一个可用条目。 (2认同)

Jak*_*rew 15

在执行我自己的基准测试以将61,277,203行加载到内存中并将值推送到Dictionary/ConcurrentDictionary()之后,结果似乎支持@dtb上面的答案,使用以下方法是最快的:

Parallel.ForEach(File.ReadLines(catalogPath), line =>
{

}); 
Run Code Online (Sandbox Code Playgroud)

我的测试还显示以下内容:

  1. File.ReadAllLines()和File.ReadAllLines().AsParallel()似乎在这个大小的文件上以几乎完全相同的速度运行.看看我的CPU活动,看起来它们似乎都使用了我的8个内核中的两个?
  2. 首先使用File.ReadAllLines()读取所有数据似乎比在Parallel.ForEach()循环中使用File.ReadLines()要慢得多.
  3. 我还尝试了一个生产者/消费者或MapReduce样式模式,其中一个线程用于读取数据,另一个线程用于处理它.这似乎也没有超越上面的简单模式.

我已经包含了此模式的示例以供参考,因为它未包含在此页面中:

var inputLines = new BlockingCollection<string>();
ConcurrentDictionary<int, int> catalog = new ConcurrentDictionary<int, int>();

var readLines = Task.Factory.StartNew(() =>
{
    foreach (var line in File.ReadLines(catalogPath)) 
        inputLines.Add(line);

        inputLines.CompleteAdding(); 
});

var processLines = Task.Factory.StartNew(() =>
{
    Parallel.ForEach(inputLines.GetConsumingEnumerable(), line =>
    {
        string[] lineFields = line.Split('\t');
        int genomicId = int.Parse(lineFields[3]);
        int taxId = int.Parse(lineFields[0]);
        catalog.TryAdd(genomicId, taxId);   
    });
});

Task.WaitAll(readLines, processLines);
Run Code Online (Sandbox Code Playgroud)

这是我的基准:

在此输入图像描述

我怀疑在某些处理条件下,生产者/消费者模式可能胜过简单的Parallel.ForEach(File.ReadLines())模式.但是,它并没有在这种情况下.


das*_*ght 7

在一个线程上读取文件,将其行添加到阻塞队列.启动N从该队列中读取的任务.设置队列的最大大小以防止出现内存不足错误.


Daa*_*mer 5

就像是:

public class ParallelReadExample
{
    public static IEnumerable LineGenerator(StreamReader sr)
    {
        while ((line = sr.ReadLine()) != null)
        {
            yield return line;
        }
    }

    static void Main()
    {
        // Display powers of 2 up to the exponent 8:
        StreamReader sr = new StreamReader("yourfile.txt")

        Parallel.ForEach(LineGenerator(sr), currentLine =>
            {
                // Do your thing with currentLine here...
            } //close lambda expression
        );

        sr.Close();
    }
}
Run Code Online (Sandbox Code Playgroud)

认为它会起作用。(这里没有 C# 编译器/IDE)

  • @obdgy:你为什么要这样做? (2认同)