相关疑难解决方法(0)

使用Rx,如何在我的Subscribe方法运行时忽略all-except-the-latest值

使用Reactive Extensions,我想忽略来自我的Subscribe方法运行时发生的事件流的消息.也就是说,处理消息的时间有时比消息之间的时间长,所以我想删除没有时间处理的消息.

但是,当我的Subscribe方法完成时,如果有任何消息确实通过,我想处理最后一个消息.所以我总是处理最新消息.

所以,如果我有一些代码可以:

messages.OnNext(100);
messages.OnNext(1);
messages.OnNext(2);
Run Code Online (Sandbox Code Playgroud)

如果我们假设'100'需要很长时间来处理.然后我希望在'100'完成时处理'2'.应该忽略'1',因为它仍然被'2'取代,而'100'仍在处理中.

这是我想要使用后台任务的结果示例 Latest()

var messages = Observable.Interval(TimeSpan.FromMilliseconds(100));

Task.Factory.StartNew(() =>
{
    foreach(var n in messages.Latest())
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(250));
        Console.WriteLine(n);
    }
});
Run Code Online (Sandbox Code Playgroud)

但是,Latest()是一个阻塞调用,我宁愿不让一个线程等待下一个这样的值(消息之间有时会有很长的间隙).

我也可以使用BroadcastBlock来自TPL Dataflow的结果得到我想要的结果,如下所示:

var buffer = new BroadcastBlock<long>(n => n);
Observable.Interval(TimeSpan.FromMilliseconds(100)).Subscribe(n => buffer.Post(n));

buffer.AsObservable()
    .Subscribe(n =>
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(250));
        Console.WriteLine(n);
    });
Run Code Online (Sandbox Code Playgroud)

但感觉它应该可以直接在Rx中使用.这是最好的方法吗?

c# system.reactive

35
推荐指数
2
解决办法
5809
查看次数

标签 统计

c# ×1

system.reactive ×1