创建Observable的正确方法是什么,该Observable将流读取到最后

NoP*_*God 6 c# system.reactive

我在这里挣扎.通常我会读一本书,但还没有.我已经找到了无数的使用RX读取流的各种事情的例子,但我发现很难理解.

我知道我可以使用Observable.FromAsyncPattern来创建Stream的BeginRead/EndRead或BeginReadLine/EndReadLine方法的包装器.

但这只读了一次 - 当第一个观察者订阅时.

我想要一个Observable,它将继续读取和泵送OnNext,直到流错误或结束.

除此之外,我还想知道如何与多个订阅者共享该observable,以便他们都能获得这些项目.

glo*_*pes 7

您可以使用它Repeat来继续阅读行直到流结束,Publish或者Replay控制多个读者之间的共享。

用于从任何流读取行直至结束的简单、完整的 Rx 解决方案的示例如下:

public static IObservable<string> ReadLines(Stream stream)
{
    return Observable.Using(
        () => new StreamReader(stream),
        reader => Observable.FromAsync(reader.ReadLineAsync)
                            .Repeat()
                            .TakeWhile(line => line != null));
}
Run Code Online (Sandbox Code Playgroud)

该解决方案还利用了到达流末尾时ReadLine返回的事实。null


NoP*_*God 1

解决方案是使用 Observable.Create

这是一个可以适用于读取任何类型的流的示例

    public static IConnectableObservable<Command> GetReadObservable(this CommandReader reader)
    {

       return Observable.Create<Command>(async (subject, token) =>
        {


            try
            {

                while (true)
                {

                    if (token.IsCancellationRequested)
                    {
                        subject.OnCompleted();
                        return;
                    }

                    //this part here can be changed to something like this
                    //int received = await Task.Factory.FromAsync<int>(innerSocket.BeginReceive(data, offset, size, SocketFlags.None, null, null), innerSocket.EndReceive);

                    Command cmd = await reader.ReadCommandAsync();

                    subject.OnNext(cmd);

                }

            }

            catch (Exception ex)
            {
                try
                {
                    subject.OnError(ex);
                }
                catch (Exception)
                {
                    Debug.WriteLine("An exception was thrown while trying to call OnError on the observable subject -- means you're not catching exceptions everywhere");
                    throw;
                }
            }

        }).Publish();

    }
Run Code Online (Sandbox Code Playgroud)

不要忘记在返回的 IConnectableObservable 上调用 Connect()