使用Rx和Await来逐行完成读取文件的异常

Eve*_*ers 5 .net c# system.reactive async-await

我正在学习使用RX并试用这个样本.但是无法修复突出显示的while语句中发生的异常 - 而(!f.EndofStream)

我想逐行读取一个巨大的文件 - 对于每一行数据 - 我想在不同的线程中进行一些处理(所以我使用了ObserverOn)我希望整个事情都是异步的.我想使用ReadLineAsync,因为它返回TASK,所以我可以将它转换为Observables并订阅它.

我想我首先创建的任务线程介于Rx线程之间.但即使我使用currentThread使用Observe和Subscribe,我仍然无法阻止异常.不知道我是如何用Rx完成这个整齐的Aysnc.

想知道整件事情是否可以做得更简单?

    static void Main(string[] args)
    {
        RxWrapper.ReadFileWithRxAsync();
        Console.WriteLine("this should be called even before the file read begins");
        Console.ReadLine();
    }

    public static async Task ReadFileWithRxAsync()
    {
        Task t = Task.Run(() => ReadFileWithRx());
        await t;
    }


    public static void ReadFileWithRx()
    {
        string file = @"C:\FileWithLongListOfNames.txt";
        using (StreamReader f = File.OpenText(file))
        {
            string line = string.Empty;
            bool continueRead = true;

            ***while (!f.EndOfStream)***
            {
                f.ReadLineAsync()
                       .ToObservable()
                       .ObserveOn(Scheduler.Default)
                       .Subscribe(t =>
                           {
                               Console.WriteLine("custom code to manipulate every line data");
                           });
            }

        }
    }
Run Code Online (Sandbox Code Playgroud)

Jam*_*rld 10

例外是InvalidOperationException- 我对FileStream的内部结构并不熟悉,但根据异常消息,这是因为在流上有一个正在进行的异步操作.这意味着您必须等待任何ReadLineAsync()电话完成才能完成检查EndOfStream.

Matthew Finlay为您的代码提供了一个巧妙的重新编写,以解决这个直接的问题.但是,我认为它有自己的问题 - 而且还有一个更大的问题需要加以研究.让我们看一下问题的基本要素:

  • 你有一个非常大的文件.
  • 您想要异步处理它.

这表明您不希望整个文件在内存中,您希望在处理完成时得到通知,并且可能您希望尽快处理该文件.

两个解决方案都使用一个线程来处理每一行(ObserveOn将每一行传递给线程池中的一个线程).这实际上不是一种有效的方法.

看看这两种解决方案,有两种可能性:

  • A.它需要更多的平均时间来阅读比它处理它一个文件行.
  • B. 读取文件行平均花费的时间少于处理文件行所需的时间.

A.文件读取的行比处理行慢

在A的情况下,系统在等待文件IO完成时基本上将花费大部分时间空闲.在这种情况下,Matthew的解决方案不会导致内存填满 - 但值得一看的是,如果ReadLines直接在紧密循环中使用会因较少的线程争用而产生更好的结果.(ObserveOn把线推到另一个线程只会给你买点东西,如果ReadLines没有提前打电话MoveNext- 我怀疑它确实 - 但测试看看!)

B.文件读取行比处理行更快

在B的情况下(我假设更有可能给出您尝试的内容),所有这些行将开始在内存中排队,并且对于足够大的文件,您将最终在内存中占据大部分.

您应该注意,除非您的处理程序触发异步代码来处理一行,否则所有行都将被串行处理,因为Rx保证OnNext()处理程序调用不会重叠.

这个ReadLines()方法很棒,因为它返回一个IEnumerable<string>并且它是你驱动读取文件的枚举.但是,当您调用ToObservable()它时,它将尽可能快地枚举以生成可观察事件 - Rx中没有反馈(在反应程序中称为"背压")以减慢此过程.

问题不在于ToObservable自身 - 它就是问题ObserveOn.ObserveOn不会阻止OnNext()在等待订阅者完成事件之前调用它的处理程序 - 它会尽可能快地将事件排队到目标调度程序.

如果你删除了ObserveOn,那么 - 只要你的OnNext处理程序是同步的 - 你会看到每一行都被读取并一次处理一行,因为它ToObservable()正在处理与处理程序相同的线程上的枚举.

如果这不是您想要的,并且您尝试通过在订阅者中触发异步作业(例如Task.Run(() => /* process line */或类似)来追求并行处理,那么事情就不会像您希望的那样好.

由于处理线路比读取线路需要更长的时间,因此您将创建越来越多的与传入线路保持同步的任务.线程数将逐渐增加,您将使线程池挨饿.

在这种情况下,Rx真的不太适合.

您可能需要的是少量工作线程(每个处理器核心可能有1个),它们一次获取一行代码,并限制内存中文件的行数.

一种简单的方法可以是这种方法,它将内存中的行数限制为固定数量的工作者.这是一个基于拉式的解决方案,在这种情况下这是一个更好的设计:

private Task ProcessFile(string filePath, int numberOfWorkers)
{
    var lines = File.ReadLines(filePath);       

    var parallelOptions = new ParallelOptions {
        MaxDegreeOfParallelism = numberOfWorkers
    };  

    return Task.Run(() => 
        Parallel.ForEach(lines, parallelOptions, ProcessFileLine));
}

private void ProcessFileLine(string line)
{
    /* Your processing logic here */
    Console.WriteLine(line);
}
Run Code Online (Sandbox Code Playgroud)

并像这样使用它:

static void Main()
{       
    var processFile = ProcessFile(
        @"C:\Users\james.world\Downloads\example.txt", 8);

    Console.WriteLine("Processing file...");        
    processFile.Wait();
    Console.WriteLine("Done");
}
Run Code Online (Sandbox Code Playgroud)

最后的笔记

有一些方法可以处理Rx中的背压(搜索SO以进行一些讨论) - 但这并不是Rx处理得好的,我认为最终解决方案的可读性低于上面的替代方案.您还可以查看许多其他方法(基于actor的方法,如TPL Dataflows,或LMAX Disruptor样式的环形缓冲区,用于高性能无锁方法),但从队列中提取工作的核心思想将会很普遍.

即使在这个分析中,我也很方便地掩盖你正在做什么来处理文件,并且默认假设每行的处理是计算绑定的并且是真正独立的.如果有工作要合并结果和/或IO活动来存储输出,那么所有的赌注都是关闭的 - 你还需要仔细检查这方面的效率.

在大多数情况下,正在考虑并行执行工作时,通常会有很多变量,因此最好测量每种方法的结果以确定最佳方法.测量是一门艺术 - 确保测量真实场景,平均每次测试的多次运行并在运行之间正确地重置环境(例如消除缓存效应)以减少测量误差.