使用Reactive Extensions,我想忽略来自我的Subscribe方法运行时发生的事件流的消息.也就是说,处理消息的时间有时比消息之间的时间长,所以我想删除没有时间处理的消息.
但是,当我的Subscribe方法完成时,如果有任何消息确实通过,我想处理最后一个消息.所以我总是处理最新消息.
所以,如果我有一些代码可以:
messages.OnNext(100);
messages.OnNext(1);
messages.OnNext(2);
Run Code Online (Sandbox Code Playgroud)
如果我们假设'100'需要很长时间来处理.然后我希望在'100'完成时处理'2'.应该忽略'1',因为它仍然被'2'取代,而'100'仍在处理中.
这是我想要使用后台任务的结果示例 Latest()
var messages = Observable.Interval(TimeSpan.FromMilliseconds(100));
Task.Factory.StartNew(() =>
{
foreach(var n in messages.Latest())
{
Thread.Sleep(TimeSpan.FromMilliseconds(250));
Console.WriteLine(n);
}
});
Run Code Online (Sandbox Code Playgroud)
但是,Latest()是一个阻塞调用,我宁愿不让一个线程等待下一个这样的值(消息之间有时会有很长的间隙).
我也可以使用BroadcastBlock来自TPL Dataflow的结果得到我想要的结果,如下所示:
var buffer = new BroadcastBlock<long>(n => n);
Observable.Interval(TimeSpan.FromMilliseconds(100)).Subscribe(n => buffer.Post(n));
buffer.AsObservable()
.Subscribe(n =>
{
Thread.Sleep(TimeSpan.FromMilliseconds(250));
Console.WriteLine(n);
});
Run Code Online (Sandbox Code Playgroud)
但感觉它应该可以直接在Rx中使用.这是最好的方法吗?