使用异步函数订阅可观察序列

kkm*_*kkm 5 c# system.reactive async-await

我有一个asnyc函数,我想在IObservable序列中的每个观察中调用,一次限制一个事件的传递.消费者希望飞行中不超过一条消息; 如果我理解正确的话,这也是RX合约.

考虑这个样本:

static void Main() {
  var ob = Observable.Interval(TimeSpan.FromMilliseconds(100));
  //var d = ob.Subscribe(async x => await Consume(x));  // Does not rate-limit.
  var d = ob.Subscribe(x => Consume(x).Wait());
  Thread.Sleep(10000);
  d.Dispose();
}

static async Task<Unit> Consume(long count) {
  Console.WriteLine($"Consuming {count} on thread {Thread.CurrentThread.ManagedThreadId}");
  await Task.Delay(750);
  Console.WriteLine($"Returning on thread {Thread.CurrentThread.ManagedThreadId}");
  return Unit.Default;
}
Run Code Online (Sandbox Code Playgroud)

Consume函数伪造750毫秒的处理时间,并ob每100毫秒产生一次事件.上面的代码有效,但调用task.Wait()随机线程.如果我在注释掉的第3行中订阅,则以与生成事件Consume相同的速率调用ob(我甚至无法理解Subscribe我在此注释语句中使用的重载,因此它可能是无意义的).

那么如何从可观察序列到async函数一次正确地传递一个事件呢?

Lee*_*ell 11

订阅者不应该长时间运行,因此不支持在Subscribe处理程序中执行长时间运行的异步方法.

相反,请将您的异步方法视为单个值可观察序列,该序列从另一个序列中获取值.现在你可以编写序列,这就是Rx的设计目的.

现在你已经实现了这个飞跃,你可能会有类似@Reijher在Howto中从rx subscribe回调异步函数的东西.

他的代码细分如下.

//The input sequence. Produces values potentially quicker than consumer
Observable.Interval(TimeSpan.FromSeconds(1))
      //Project the event you receive, into the result of the async method
      .Select(l => Observable.FromAsync(() => asyncMethod(l)))
      //Ensure that the results are serialized
      .Concat()
      //do what you will here with the results of the async method calls
      .Subscribe();
Run Code Online (Sandbox Code Playgroud)

在此方案中,您将创建隐式队列.在生产者比消费者更快的任何问题中,需要使用队列在等待时收集值.我个人更喜欢通过将数据放入队列来使其显式化.或者,您可以明确地使用调度程序来发出信号,表明该应该是松弛的线程模型.

对于Rx新手来说,这似乎是一个流行的障碍(在订阅处理程序中执行异步).有很多原因的指导是不把它们放在你的用户,例如:1.你打破误差模型2.你混合异步模型(在这里RX,任务有)3.认购是组合物的消费者异步序列.异步方法只是一个单独的值序列,因此该视图不能是序列的结尾,但结果可能是.

UPDATE

为了说明关于打破错误模型的评论,这是OP样本的更新.

void Main()
{
    var ob = Observable.Interval(TimeSpan.FromMilliseconds(100));
    var d = ob.Subscribe(
        x => ConsumeThrows(x).Wait(),
        ex=> Console.WriteLine("I will not get hit"));

    Thread.Sleep(10000);
    d.Dispose();
}

static async Task<Unit> ConsumeThrows(long count)
{
    return await Task.FromException<Unit>(new Exception("some failure"));
    //this will have the same effect of bringing down the application.
    //throw new Exception("some failure");
}
Run Code Online (Sandbox Code Playgroud)

在这里我们可以看到,如果OnNext处理程序要抛出,那么我们就不受我们的Rx OnError处理程序的保护.该异常将无法处理,很可能会导致应用程序失效.