kkm*_*kkm 5 c# system.reactive async-await
我有一个asnyc函数,我想在IObservable序列中的每个观察中调用,一次限制一个事件的传递.消费者希望飞行中不超过一条消息; 如果我理解正确的话,这也是RX合约.
考虑这个样本:
static void Main() {
var ob = Observable.Interval(TimeSpan.FromMilliseconds(100));
//var d = ob.Subscribe(async x => await Consume(x)); // Does not rate-limit.
var d = ob.Subscribe(x => Consume(x).Wait());
Thread.Sleep(10000);
d.Dispose();
}
static async Task<Unit> Consume(long count) {
Console.WriteLine($"Consuming {count} on thread {Thread.CurrentThread.ManagedThreadId}");
await Task.Delay(750);
Console.WriteLine($"Returning on thread {Thread.CurrentThread.ManagedThreadId}");
return Unit.Default;
}
Run Code Online (Sandbox Code Playgroud)
该Consume函数伪造750毫秒的处理时间,并ob每100毫秒产生一次事件.上面的代码有效,但调用task.Wait()随机线程.如果我在注释掉的第3行中订阅,则以与生成事件Consume相同的速率调用ob(我甚至无法理解Subscribe我在此注释语句中使用的重载,因此它可能是无意义的).
那么如何从可观察序列到async函数一次正确地传递一个事件呢?
Lee*_*ell 11
订阅者不应该长时间运行,因此不支持在Subscribe处理程序中执行长时间运行的异步方法.
相反,请将您的异步方法视为单个值可观察序列,该序列从另一个序列中获取值.现在你可以编写序列,这就是Rx的设计目的.
现在你已经实现了这个飞跃,你可能会有类似@Reijher在Howto中从rx subscribe回调异步函数的东西?.
他的代码细分如下.
//The input sequence. Produces values potentially quicker than consumer
Observable.Interval(TimeSpan.FromSeconds(1))
//Project the event you receive, into the result of the async method
.Select(l => Observable.FromAsync(() => asyncMethod(l)))
//Ensure that the results are serialized
.Concat()
//do what you will here with the results of the async method calls
.Subscribe();
Run Code Online (Sandbox Code Playgroud)
在此方案中,您将创建隐式队列.在生产者比消费者更快的任何问题中,需要使用队列在等待时收集值.我个人更喜欢通过将数据放入队列来使其显式化.或者,您可以明确地使用调度程序来发出信号,表明该应该是松弛的线程模型.
对于Rx新手来说,这似乎是一个流行的障碍(在订阅处理程序中执行异步).有很多原因的指导是不把它们放在你的用户,例如:1.你打破误差模型2.你混合异步模型(在这里RX,任务有)3.认购是组合物的消费者异步序列.异步方法只是一个单独的值序列,因此该视图不能是序列的结尾,但结果可能是.
UPDATE
为了说明关于打破错误模型的评论,这是OP样本的更新.
void Main()
{
var ob = Observable.Interval(TimeSpan.FromMilliseconds(100));
var d = ob.Subscribe(
x => ConsumeThrows(x).Wait(),
ex=> Console.WriteLine("I will not get hit"));
Thread.Sleep(10000);
d.Dispose();
}
static async Task<Unit> ConsumeThrows(long count)
{
return await Task.FromException<Unit>(new Exception("some failure"));
//this will have the same effect of bringing down the application.
//throw new Exception("some failure");
}
Run Code Online (Sandbox Code Playgroud)
在这里我们可以看到,如果OnNext处理程序要抛出,那么我们就不受我们的Rx OnError处理程序的保护.该异常将无法处理,很可能会导致应用程序失效.
| 归档时间: |
|
| 查看次数: |
5268 次 |
| 最近记录: |