如何在Observable.Do扩展方法中传递异步方法?

mar*_*ark 4 system.reactive

鉴于:

  1. IObservable<T> src
  2. async Task F(T){...}.
  3. F只能按顺序调用.所以await F(x);await F(y);很好,但是Task.Factory.ContinueWhenAll(new[]{F(x),F(y)}, _ => {...});错了,因为F(x)并且F(y)不能同时运行.

我很清楚这await src.Do(F)是错误的,因为它会F同时运行.

我的问题是如何正确地做到这一点?

Dav*_*ton 6

Observable.Do仅用于副作用,不用于顺序组合. SelectMany可能是你想要的.从Rx 2.0开始,有一些重载SelectMany可以很容易地组成observable Task<T>.(请注意这些和类似的Task/Observable coop运算符可能引入的额外并发性.)

var q = from value in src
        from _ in F(value.X).AsVoidAsync()  // See helper definition below
        from __ in F(value.Y).AsVoidAsync()
        select value;
Run Code Online (Sandbox Code Playgroud)

但是,基于您特别询问Do运算符的事实,我怀疑src可能包含多个值,并且您不希望为每个值重叠调用F. 在这种情况下,考虑到SelectMany实际上就是这样Select->Merge; 因此,你可能想要的是Select->Concat.

// using System.Reactive.Threading.Tasks

var q = src.Select(x => Observable.Defer(() => F(x).ToObservable())).Concat();
Run Code Online (Sandbox Code Playgroud)

不要忘了用Defer,因为F(x)热的.

AsVoidAsync扩展:

IObservable<T>需要一个T,但Task代表无效,因此Rx的转换运算符要求我们Task<T>从a 得到一个Task.我倾向于使用Rx的System.Reactive.Unit结构T:

public static class TaskExtensions
{
  public static Task<Unit> AsVoidAsync(this Task task)
  {
    return task.ContinueWith(t =>
    {
      var tcs = new TaskCompletionSource<Unit>();

      if (t.IsCanceled)
      {
        tcs.SetCanceled();
      }
      else if (t.IsFaulted)
      {
        tcs.SetException(t.Exception);
      }
      else
      {
        tcs.SetResult(Unit.Default);
      }

      return tcs.Task;
    },
    TaskContinuationOptions.ExecuteSynchronously)
    .Unwrap();
  }
}
Run Code Online (Sandbox Code Playgroud)

或者,您可以始终只调用专门的ToObservable方法.