NoP*_*God 6 c# system.reactive
我在这里挣扎.通常我会读一本书,但还没有.我已经找到了无数的使用RX读取流的各种事情的例子,但我发现很难理解.
我知道我可以使用Observable.FromAsyncPattern来创建Stream的BeginRead/EndRead或BeginReadLine/EndReadLine方法的包装器.
但这只读了一次 - 当第一个观察者订阅时.
我想要一个Observable,它将继续读取和泵送OnNext,直到流错误或结束.
除此之外,我还想知道如何与多个订阅者共享该observable,以便他们都能获得这些项目.
您可以使用它Repeat来继续阅读行直到流结束,Publish或者Replay控制多个读者之间的共享。
用于从任何流读取行直至结束的简单、完整的 Rx 解决方案的示例如下:
public static IObservable<string> ReadLines(Stream stream)
{
return Observable.Using(
() => new StreamReader(stream),
reader => Observable.FromAsync(reader.ReadLineAsync)
.Repeat()
.TakeWhile(line => line != null));
}
Run Code Online (Sandbox Code Playgroud)
该解决方案还利用了到达流末尾时ReadLine返回的事实。null
解决方案是使用 Observable.Create
这是一个可以适用于读取任何类型的流的示例
public static IConnectableObservable<Command> GetReadObservable(this CommandReader reader)
{
return Observable.Create<Command>(async (subject, token) =>
{
try
{
while (true)
{
if (token.IsCancellationRequested)
{
subject.OnCompleted();
return;
}
//this part here can be changed to something like this
//int received = await Task.Factory.FromAsync<int>(innerSocket.BeginReceive(data, offset, size, SocketFlags.None, null, null), innerSocket.EndReceive);
Command cmd = await reader.ReadCommandAsync();
subject.OnNext(cmd);
}
}
catch (Exception ex)
{
try
{
subject.OnError(ex);
}
catch (Exception)
{
Debug.WriteLine("An exception was thrown while trying to call OnError on the observable subject -- means you're not catching exceptions everywhere");
throw;
}
}
}).Publish();
}
Run Code Online (Sandbox Code Playgroud)
不要忘记在返回的 IConnectableObservable 上调用 Connect()