Eve*_*ers 5 .net c# system.reactive async-await
我正在学习使用RX并试用这个样本.但是无法修复突出显示的while语句中发生的异常 - 而(!f.EndofStream)
我想逐行读取一个巨大的文件 - 对于每一行数据 - 我想在不同的线程中进行一些处理(所以我使用了ObserverOn)我希望整个事情都是异步的.我想使用ReadLineAsync,因为它返回TASK,所以我可以将它转换为Observables并订阅它.
我想我首先创建的任务线程介于Rx线程之间.但即使我使用currentThread使用Observe和Subscribe,我仍然无法阻止异常.不知道我是如何用Rx完成这个整齐的Aysnc.
想知道整件事情是否可以做得更简单?
static void Main(string[] args)
{
RxWrapper.ReadFileWithRxAsync();
Console.WriteLine("this should be called even before the file read begins");
Console.ReadLine();
}
public static async Task ReadFileWithRxAsync()
{
Task t = Task.Run(() => ReadFileWithRx());
await t;
}
public static void ReadFileWithRx()
{
string file = @"C:\FileWithLongListOfNames.txt";
using (StreamReader f = File.OpenText(file))
{
string line = string.Empty;
bool continueRead = true;
***while (!f.EndOfStream)***
{
f.ReadLineAsync()
.ToObservable()
.ObserveOn(Scheduler.Default)
.Subscribe(t =>
{
Console.WriteLine("custom code to manipulate every line data");
});
}
}
}
Run Code Online (Sandbox Code Playgroud)
Jam*_*rld 10
例外是InvalidOperationException- 我对FileStream的内部结构并不熟悉,但根据异常消息,这是因为在流上有一个正在进行的异步操作.这意味着您必须等待任何ReadLineAsync()电话完成才能完成检查EndOfStream.
Matthew Finlay为您的代码提供了一个巧妙的重新编写,以解决这个直接的问题.但是,我认为它有自己的问题 - 而且还有一个更大的问题需要加以研究.让我们看一下问题的基本要素:
这表明您不希望整个文件在内存中,您希望在处理完成时得到通知,并且可能您希望尽快处理该文件.
两个解决方案都使用一个线程来处理每一行(ObserveOn将每一行传递给线程池中的一个线程).这实际上不是一种有效的方法.
看看这两种解决方案,有两种可能性:
在A的情况下,系统在等待文件IO完成时基本上将花费大部分时间空闲.在这种情况下,Matthew的解决方案不会导致内存填满 - 但值得一看的是,如果ReadLines直接在紧密循环中使用会因较少的线程争用而产生更好的结果.(ObserveOn把线推到另一个线程只会给你买点东西,如果ReadLines没有提前打电话MoveNext- 我怀疑它确实 - 但测试看看!)
在B的情况下(我假设更有可能给出您尝试的内容),所有这些行将开始在内存中排队,并且对于足够大的文件,您将最终在内存中占据大部分.
您应该注意,除非您的处理程序触发异步代码来处理一行,否则所有行都将被串行处理,因为Rx保证OnNext()处理程序调用不会重叠.
这个ReadLines()方法很棒,因为它返回一个IEnumerable<string>并且它是你驱动读取文件的枚举.但是,当您调用ToObservable()它时,它将尽可能快地枚举以生成可观察事件 - Rx中没有反馈(在反应程序中称为"背压")以减慢此过程.
问题不在于ToObservable自身 - 它就是问题ObserveOn.ObserveOn不会阻止OnNext()在等待订阅者完成事件之前调用它的处理程序 - 它会尽可能快地将事件排队到目标调度程序.
如果你删除了ObserveOn,那么 - 只要你的OnNext处理程序是同步的 - 你会看到每一行都被读取并一次处理一行,因为它ToObservable()正在处理与处理程序相同的线程上的枚举.
如果这不是您想要的,并且您尝试通过在订阅者中触发异步作业(例如Task.Run(() => /* process line */或类似)来追求并行处理,那么事情就不会像您希望的那样好.
由于处理线路比读取线路需要更长的时间,因此您将创建越来越多的与传入线路保持同步的任务.线程数将逐渐增加,您将使线程池挨饿.
在这种情况下,Rx真的不太适合.
您可能需要的是少量工作线程(每个处理器核心可能有1个),它们一次获取一行代码,并限制内存中文件的行数.
一种简单的方法可以是这种方法,它将内存中的行数限制为固定数量的工作者.这是一个基于拉式的解决方案,在这种情况下这是一个更好的设计:
private Task ProcessFile(string filePath, int numberOfWorkers)
{
var lines = File.ReadLines(filePath);
var parallelOptions = new ParallelOptions {
MaxDegreeOfParallelism = numberOfWorkers
};
return Task.Run(() =>
Parallel.ForEach(lines, parallelOptions, ProcessFileLine));
}
private void ProcessFileLine(string line)
{
/* Your processing logic here */
Console.WriteLine(line);
}
Run Code Online (Sandbox Code Playgroud)
并像这样使用它:
static void Main()
{
var processFile = ProcessFile(
@"C:\Users\james.world\Downloads\example.txt", 8);
Console.WriteLine("Processing file...");
processFile.Wait();
Console.WriteLine("Done");
}
Run Code Online (Sandbox Code Playgroud)
有一些方法可以处理Rx中的背压(搜索SO以进行一些讨论) - 但这并不是Rx处理得好的,我认为最终解决方案的可读性低于上面的替代方案.您还可以查看许多其他方法(基于actor的方法,如TPL Dataflows,或LMAX Disruptor样式的环形缓冲区,用于高性能无锁方法),但从队列中提取工作的核心思想将会很普遍.
即使在这个分析中,我也很方便地掩盖你正在做什么来处理文件,并且默认假设每行的处理是计算绑定的并且是真正独立的.如果有工作要合并结果和/或IO活动来存储输出,那么所有的赌注都是关闭的 - 你还需要仔细检查这方面的效率.
在大多数情况下,正在考虑并行执行工作时,通常会有很多变量,因此最好测量每种方法的结果以确定最佳方法.测量是一门艺术 - 确保测量真实场景,平均每次测试的多次运行并在运行之间正确地重置环境(例如消除缓存效应)以减少测量误差.
| 归档时间: |
|
| 查看次数: |
1399 次 |
| 最近记录: |