Sam*_* S. 7 c# infinite-loop asyncsocket system.reactive
在stream.DataAvailable为false之前,如何进行以下可观察重复?目前它看起来永远不会停止.
在Defer部分内的AsyncReadChunk和Observable.Return使OnNext调用然后OnCompleted调用.当Repeat接收OnNext调用时,它将它传递给TakeWhile.当TakeWhile不满意时,它完成了observable,但我认为在OnNext之后出现的OnCompleted非常快,它会使Repeat重新订阅observable并导致无限循环.
我该如何纠正这种行为?
public static IObservable<byte[]> AsyncRead(this NetworkStream stream, int bufferSize)
{
    return Observable.Defer(() =>
        {
            try
            {
                return stream.DataAvailable ? AsyncReadChunk(stream, bufferSize) : Observable.Return(new byte[0]);
            }
            catch (Exception)
            {
                return Observable.Return(new byte[0]);
            }
        })
        .Repeat()
        .TakeWhile((dataChunk, index) => dataChunk.Length > 0);
}
自答:(下面是问题作者 Samet 发布的答案。但是,他将答案作为问题的一部分发布。我将其移至单独的答案中,标记为社区 wiki,因为作者还没有“是他自己动的。)
我通过重构发现是调度器的问题。Return 使用 Immediate 调度程序,而 Repeat 使用 CurrentThread。固定代码如下。
    public static IObservable<byte[]> AsyncRead(this NetworkStream stream, int bufferSize)
    {
        return Observable.Defer(() =>
                                    {
                                        try
                                        {
                                            return stream.DataAvailable ? AsyncReadChunk(stream, bufferSize) : Observable.Return(new byte[0], Scheduler.CurrentThread);
                                        }
                                        catch (Exception)
                                        {
                                            return Observable.Return(new byte[0], Scheduler.CurrentThread);
                                        }
                                    })
            .Repeat()
            .TakeWhile((dataChunk, index) => dataChunk.Length > 0);
    }