使用Rx,如何在我的Subscribe方法运行时忽略all-except-the-latest值

Wil*_*lka 35 c# system.reactive

使用Reactive Extensions,我想忽略来自我的Subscribe方法运行时发生的事件流的消息.也就是说,处理消息的时间有时比消息之间的时间长,所以我想删除没有时间处理的消息.

但是,当我的Subscribe方法完成时,如果有任何消息确实通过,我想处理最后一个消息.所以我总是处理最新消息.

所以,如果我有一些代码可以:

messages.OnNext(100);
messages.OnNext(1);
messages.OnNext(2);
Run Code Online (Sandbox Code Playgroud)

如果我们假设'100'需要很长时间来处理.然后我希望在'100'完成时处理'2'.应该忽略'1',因为它仍然被'2'取代,而'100'仍在处理中.

这是我想要使用后台任务的结果示例 Latest()

var messages = Observable.Interval(TimeSpan.FromMilliseconds(100));

Task.Factory.StartNew(() =>
{
    foreach(var n in messages.Latest())
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(250));
        Console.WriteLine(n);
    }
});
Run Code Online (Sandbox Code Playgroud)

但是,Latest()是一个阻塞调用,我宁愿不让一个线程等待下一个这样的值(消息之间有时会有很长的间隙).

我也可以使用BroadcastBlock来自TPL Dataflow的结果得到我想要的结果,如下所示:

var buffer = new BroadcastBlock<long>(n => n);
Observable.Interval(TimeSpan.FromMilliseconds(100)).Subscribe(n => buffer.Post(n));

buffer.AsObservable()
    .Subscribe(n =>
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(250));
        Console.WriteLine(n);
    });
Run Code Online (Sandbox Code Playgroud)

但感觉它应该可以直接在Rx中使用.这是最好的方法吗?

yam*_*men 9

这是一个类似于Dave但使用的方法Sample(比缓冲更合适).我在Dave的回答中添加了类似的扩展方法.

扩展名:

public static IDisposable SubscribeWithoutOverlap<T>(this IObservable<T> source, Action<T> action)
{
    var sampler = new Subject<Unit>();

    var sub = source.
        Sample(sampler).
        ObserveOn(Scheduler.ThreadPool).
        Subscribe(l =>
        {
            action(l);
            sampler.OnNext(Unit.Default);
        });

    // start sampling when we have a first value
    source.Take(1).Subscribe(_ => sampler.OnNext(Unit.Default));

    return sub;
}
Run Code Online (Sandbox Code Playgroud)

请注意,它更简单,并且没有触发的"空"缓冲区.发送给动作的第一个元素实际上来自流本身.

用法很简单:

messages.SubscribeWithoutOverlap(n =>
{
    Console.WriteLine("start: " + n);
    Thread.Sleep(500);
    Console.WriteLine("end: " + n);
});

messages.Subscribe(x => Console.WriteLine("source: " + x)); // for testing
Run Code Online (Sandbox Code Playgroud)

结果:

source: 0
start: 0
source: 1
source: 2
source: 3
source: 4
source: 5
end: 0
start: 5
source: 6
source: 7
source: 8
source: 9
source: 10
end: 5
start: 10
source: 11
source: 12
source: 13
source: 14
source: 15
end: 10
Run Code Online (Sandbox Code Playgroud)

  • 这有一个问题,如果源在调用sampler.OnNext时没有在样本缓冲区中放置任何内容,那么系统将进入不再生成任何值的状态.我使用Switch而不是示例http://stackoverflow.com/a/15876519/158285对此进行了修改 (3认同)

Wil*_*lka 4

感谢 Lee Campbell(以Intro To Rx闻名),我现在有了一个使用此扩展方法的可行解决方案:

public static IObservable<T> ObserveLatestOn<T>(this IObservable<T> source, IScheduler scheduler)
{
    return Observable.Create<T>(observer =>
    {
        Notification<T> outsideNotification = null;
        var gate = new object();
        bool active = false;
        var cancelable = new MultipleAssignmentDisposable();
        var disposable = source.Materialize().Subscribe(thisNotification =>
        {
            bool alreadyActive;
            lock (gate)
            {
                alreadyActive = active;
                active = true;
                outsideNotification = thisNotification;
            }

            if (!alreadyActive)
            {
                cancelable.Disposable = scheduler.Schedule(self =>
                {
                    Notification<T> localNotification = null;
                    lock (gate)
                    {
                        localNotification = outsideNotification;
                        outsideNotification = null;
                    }
                    localNotification.Accept(observer);
                    bool hasPendingNotification = false;
                    lock (gate)
                    {
                        hasPendingNotification = active = (outsideNotification != null);
                    }
                    if (hasPendingNotification)
                    {
                        self();
                    }
                });
            }
        });
        return new CompositeDisposable(disposable, cancelable);
    });
}
Run Code Online (Sandbox Code Playgroud)

  • @AndrewHanlon 使用通知而不仅仅是值来处理异常,否则它们将无法正确地传递到 OnError 通道。 (2认同)