为什么可以等待Rx可观察?

mar*_*ark 7 c# system.reactive async-await

我刚刚注意到该await关键字可以与Rx Observable一起使用,例如:

await Observable.Interval(TimeSpan.FromHours(1));
Run Code Online (Sandbox Code Playgroud)

我很确定它只能与Tasks一起使用.

那是什么让它成为可能?可观察的知识是否硬编码到编译器中?

Mik*_*ray 7

不,编译器没有特殊的知识IObservable<T>.根据C#5规范的7.7.7.1节,如果对象有一个方法,或者在名为scope的扩展方法中GetAwaiter返回一个实现的类型System.Runtime.CompilerServices.INotifyCompletion,则可以等待它.请参阅Steven Toub的文章,等待任何事情.

更具体地说,来自规范

等待表达式的任务必须是等待的.如果满足以下条件之一,则表达式t等待的:
- t是编译时类型dynamic
- t具有可访问的实例或名为GetAwaiter的扩展方法,没有参数且没有类型参数,以及返回类型A,其中包含以下所有内容保持:
1.实现接口System.Runtime.CompilerServices.INotifyCompletion(以下简称为INotifyCompletion)2.A
具有bool
3 类型的可访问,可读的实例属性IsCompleted .A具有可访问的实例方法GetResult,不带参数和没有类型参数

请注意这与foreach不需要的方式类似,IEnumerable<T>只是一个返回兼容对象的GetEnumerator方法.这种鸭子类型是一种性能优化,它允许编译器使用值类型而无需装箱.这可用于避免性能敏感代码中不必要的分配.


Ned*_*nov 7

我认为这是因为System.Reactive.Linq定义了一个GetAwaiter扩展方法IObservable.正如@mike z解释的那样,允许你等待一个IObservable.这是方法:

public static AsyncSubject<TSource> GetAwaiter<TSource>(this IObservable<TSource> source)
{
    if (source == null)
    {
        throw new ArgumentNullException("source");
    }
    return s_impl.GetAwaiter<TSource>(source);
}
Run Code Online (Sandbox Code Playgroud)

返回的类型AsyncSubject<T>实现INotifyCompletion并具有IsCompleted属性和GetResult方法.

public sealed class AsyncSubject<T> : ISubject<T>, ISubject<T, T>, IObserver<T>, IObservable<T>, IDisposable, INotifyCompletion
{
    // Fields
    private Exception _exception;
    private readonly object _gate;
    private bool _hasValue;
    private bool _isDisposed;
    private bool _isStopped;
    private ImmutableList<IObserver<T>> _observers;
    private T _value;

    // Methods
    public AsyncSubject();
    private void CheckDisposed();
    public void Dispose();
    public AsyncSubject<T> GetAwaiter();
    public T GetResult();
    public void OnCompleted();
    public void OnCompleted(Action continuation);
    private void OnCompleted(Action continuation, bool originalContext);
    public void OnError(Exception error);
    public void OnNext(T value);
    public IDisposable Subscribe(IObserver<T> observer);

    // Properties
    public bool HasObservers { get; }
    public bool IsCompleted { get; }
Run Code Online (Sandbox Code Playgroud)