Mar*_*und 5 c# enumerable system.reactive observable
这是一个相当有教育意义的,出于好奇的问题。考虑以下片段:
var enumerable = Enumerable.Range(0, 5);
var observable = enumerable.ToObservable();
var enu = observable.Concat(observable).ToEnumerable();
enu.ToObservable().SubscribeDebug();
Run Code Online (Sandbox Code Playgroud)
凡SubscribeDebug订阅一个简单的观察:
public class DebugObserver<T> : IObserver<T>
{
public void OnCompleted()
{
Debug.WriteLine("Completed");
}
public void OnError(Exception error)
{
Debug.WriteLine("Error");
}
public void OnNext(T value)
{
Debug.WriteLine("Value: {0}", value);
}
}
Run Code Online (Sandbox Code Playgroud)
这个的输出是:
值:0
价值:1
价值:2
价值:3
价值:4
然后块。有人可以帮助我理解它发生的根本原因以及为什么 observable 没有完成吗?我注意到它在没有Concat调用的情况下完成,但会阻塞。
I've looked at the source of ToObservable and distilled a minimal implementation. It does reproduce the behavior we're seeing.
public static IObservable<T> ToObservableEx<T>(this IEnumerable<T> enumerable) =>
ToObservableEx(enumerable, CurrentThreadScheduler.Instance);
public static IObservable<T> ToObservableEx<T>(this IEnumerable<T> enumerable, IScheduler scheduler) =>
Observable.Create<T>
(
observer =>
{
IDisposable loopRec(IScheduler inner, IEnumerator<T> enumerator)
{
if (enumerator.MoveNext())
{
observer.OnNext(enumerator.Current);
inner.Schedule(enumerator, loopRec); //<-- culprit
}
else
{
observer.OnCompleted();
}
// ToObservable.cs Line 117
// We never allow the scheduled work to be cancelled.
return Disposable.Empty;
}
return scheduler.Schedule(enumerable.GetEnumerator(), loopRec);
}
);
Run Code Online (Sandbox Code Playgroud)
With that out of the way - the crux of the problem lies in the behavior of CurrentThreadScheduler, which is the default scheduler used.
The behavior of CurrentThreadScheduler is that if a schedule is already running while Schedule is being called - it ends up being queued.
CurrentThreadScheduler.Instance.Schedule(() =>
{
CurrentThreadScheduler.Instance.Schedule(() =>
Console.WriteLine(1)
);
Console.WriteLine(2);
});
Run Code Online (Sandbox Code Playgroud)
This prints 2 1. This queuing behavior is our undoing.
When observer.OnCompleted() is called, it causes Concat to start the next enumeration - however, things are not the same as when we started out - we are still inside the observer => { } block when we try to schedule the next one. So instead of executing immediately, the next schedule gets queued.
Now enumerator.MoveNext() is caught in a dead-lock.
It can't move to the next item - MoveNext is blocking until the next item arrives - which can only arrive when scheduled by the ToObservable loop.
But the Scheduler can only work to notify ToEnumerable and subsequently MoveNext() which is being held up - once it exits loopRec - which it can't because it's being blocked by MoveNext in the first place.
Addendum
This is approximately what ToEnumerable (from GetEnumerator.cs) does (not a valid implementation):
public static IEnumerable<T> ToEnumerableEx<T>(this IObservable<T> observable)
{
var gate = new SemaphoreSlim(0);
var queue = new ConcurrentQueue<T>();
using(observable.Subscribe(
value => { queue.Enqueue(value); gate.Release(); },
() => gate.Release()))
while (true)
{
gate.Wait(); //this is where it blocks
if (queue.TryDequeue(out var current))
yield return current;
else
break;
}
}
Run Code Online (Sandbox Code Playgroud)
Enumerables are expected to be blocking until the next item is yielded - and that's why there's a gating implementation. It's not Enumerable.Range which blocks, but ToEnumerable.