Bea*_*ats 4 c# iterator system.reactive async-await c#-5.0
我试图将下面的类转换为懒惰返回一个文件.
public class ObservableFile2 : IObservable<string>
{
private readonly IObservable<string> subject;
public ObservableFile2(string fileName)
{
subject = Observable.Using<string, StreamReader>
(
() => new StreamReader(new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.Read)),
streamReader => ObserveLines(streamReader)
);
}
private IObservable<string> ObserveLines(StreamReader streamReader)
{
return ReadLines(streamReader).ToObservable();
}
private IEnumerable<string> ReadLines(StreamReader streamReader)
{
while (!streamReader.EndOfStream)
{
yield return streamReader.ReadLine();
}
}
public IDisposable Subscribe(IObserver<string> observer)
{
return subject.Subscribe(observer);
}
}
Run Code Online (Sandbox Code Playgroud)
我现在正试图将其转换为使用
StreamReader.ReadLineAsync()
Run Code Online (Sandbox Code Playgroud)
甚至更好的是将数据分块,即
await SourceStream.ReadAsync(buffer, 0, chunkSize).
Run Code Online (Sandbox Code Playgroud)
我似乎没有掌握如何包装和解包任务
欢迎提供援助.
谢谢
我不是 Rx大师,所以可能有比我的答案更好的方法.
我相信这应该可以使用async-enabled Create:
public static class ObservableFile2
{
public static IObservable<string> Create(string fileName)
{
return Observable.Create<string>(async (subject, token) =>
{
try
{
using (var streamReader = new StreamReader(new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.Read))
{
while (true)
{
token.ThrowIfCancellationRequested();
var line = await streamReader.ReadLineAsync();
if (line == null)
{
subject.OnCompleted();
return;
}
subject.OnNext(line);
}
}
}
catch (Exception ex)
{
subject.OnError(ex);
}
});
}
}
Run Code Online (Sandbox Code Playgroud)