Observable.FromAsync+Repeat+TakeWhile 组合创建无限循环

Dan*_*ler 5 c# system.reactive reactive

有人可以解释为什么AsObservable即使到达流末尾,以下方法也会创建无限循环吗?

public static class StreamExt {
    public static IObservable<byte> AsObservable(this Stream stream, int bufferSize) {
        return Observable
            .FromAsync(cancel => stream.ReadBytes(bufferSize, cancel))
            .Repeat()
            .TakeWhile(bytes => bytes != null) // EndOfStream
            .SelectMany(bytes => bytes);
    }

    private static async Task<byte[]> ReadBytes(this Stream stream, int bufferSize, CancellationToken cancel) {
        var buf = new byte[bufferSize];
        var bytesRead = await stream
            .ReadAsync(buf, 0, bufferSize, cancel)
            .ConfigureAwait(false);

        if (bytesRead < 1) return null; // EndOfStream
        var result_size = Math.Min(bytesRead, bufferSize);
        Array.Resize(ref buf, result_size);
        return buf;
    }
}
Run Code Online (Sandbox Code Playgroud)

快速测试表明它会产生无限循环:

class Program {
    static void Main(string[] args) {
        using (var stream = new MemoryStream(new byte[] { 1, 2, 3 })) {
            var testResult = stream
                .AsObservable(1024)
                .ToEnumerable()
                .ToArray();
            Console.WriteLine(testResult.Length);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

当然,我可以添加一个.SubscribeOn(TaskPoolScheduler.Default),但是,无限循环保持活动状态(阻止任务池调度程序 + 无限读取Stream)。

[更新 2017-05-09]

Shlomo 发布了一个更好的例子来重现这个问题:

int i = 0;
var testResult = Observable.FromAsync(() => Task.FromResult(i++))
    .Repeat()
    .TakeWhile(l => l < 3);
testResult.Subscribe(b => Console.WriteLine(b), e => { }, () => Console.WriteLine("OnCompleted"));
Console.WriteLine("This is never printed.");
Run Code Online (Sandbox Code Playgroud)

Mar*_* L. 4

对于最终来到这里并需要答案而不仅仅是解释的任何人:问题似乎是 的默认调度程序,如这个自我回答的问题FromAsync所示。如果您调整为“当前线程”调度程序的行为会更加可预测。例如(问题摘录):Repeat().TakeWhile(...)

.FromAsync(cancel => stream.ReadBytes(bufferSize, cancel), 
    System.Reactive.Concurrency.Scheduler.CurrentThread)
.Repeat()
.TakeWhile(bytes => bytes != null) // EndOfStream
Run Code Online (Sandbox Code Playgroud)