从Stream生成IObservable <String>的首选方法

bar*_*ong 8 c# reactive-programming system.reactive

作为我们的应用程序的一部分(现在生产大约4个月),我们有一个来自外部设备的数据流,我们将其转换为IObservable

到目前为止,我们一直在使用以下方法来生成它,并且它一直运行良好.

IObservable<string> ObserveStringStream(Stream inputStream)
{
    var streamReader = new StreamReader(inputStream);
    return Observable
            .Create<string>(observer => Scheduler.ThreadPool
            .Schedule(() => ReadLoop(streamReader, observer)));
}

private void ReadLoop(StreamReader reader, IObserver<string> observer)
{
    while (true)
    {
        try
        {
            var line = reader.ReadLine();
            if (line != null)
            {
                observer.OnNext(line);
            }
            else
            {
                observer.OnCompleted();
                break;
            }
        }
        catch (Exception ex)
        {
            observer.OnError(ex);
            break;
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

昨晚我想知道是否有办法使用yield return语法来实现相同的结果并想出了这个:

IObservable<string> ObserveStringStream(Stream inputStream)
{
    var streamReader = new StreamReader(inputStream);
    return ReadLoop(streamReader)
            .ToObservable(Scheduler.ThreadPool);
}

private IEnumerable<string> ReadLoop(StreamReader reader)
{
    while (true)
    {
        var line = reader.ReadLine();
        if (line != null)
        {
            yield return line;
        }
        else
        {
            yield break;
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

它似乎工作得很好而且更干净,但我想知道是否有任何优点或缺点,一方面超过另一方,或者是否有一个更好的方式完全.

yam*_*men 13

我认为你有一个好主意(Stream转入Enumerable<string>当时IObservable<string>).但是,IEnumerable代码可以更清晰:

IEnumerable<string> ReadLines(Stream stream)
{
    using (StreamReader reader = new StreamReader(stream))
    {
        while (!reader.EndOfStream)
            yield return reader.ReadLine();
    }
}
Run Code Online (Sandbox Code Playgroud)

然后为IObservable:

IObservable<string> ObserveLines(Stream inputStream)
{
    return ReadLines(inputStream).ToObservable(Scheduler.ThreadPool);
}
Run Code Online (Sandbox Code Playgroud)

这更短,更易读,并且可以正确处理流.它也很懒惰.

ToObservable扩展需要捕捉的护理OnNext事件(新线),以及该OnCompleted事件(的枚举结束)和OnError.