Wil*_*lka 35 c# system.reactive
使用Reactive Extensions,我想忽略来自我的Subscribe
方法运行时发生的事件流的消息.也就是说,处理消息的时间有时比消息之间的时间长,所以我想删除没有时间处理的消息.
但是,当我的Subscribe
方法完成时,如果有任何消息确实通过,我想处理最后一个消息.所以我总是处理最新消息.
所以,如果我有一些代码可以:
messages.OnNext(100);
messages.OnNext(1);
messages.OnNext(2);
Run Code Online (Sandbox Code Playgroud)
如果我们假设'100'需要很长时间来处理.然后我希望在'100'完成时处理'2'.应该忽略'1',因为它仍然被'2'取代,而'100'仍在处理中.
这是我想要使用后台任务的结果示例 Latest()
var messages = Observable.Interval(TimeSpan.FromMilliseconds(100));
Task.Factory.StartNew(() =>
{
foreach(var n in messages.Latest())
{
Thread.Sleep(TimeSpan.FromMilliseconds(250));
Console.WriteLine(n);
}
});
Run Code Online (Sandbox Code Playgroud)
但是,Latest()是一个阻塞调用,我宁愿不让一个线程等待下一个这样的值(消息之间有时会有很长的间隙).
我也可以使用BroadcastBlock
来自TPL Dataflow的结果得到我想要的结果,如下所示:
var buffer = new BroadcastBlock<long>(n => n);
Observable.Interval(TimeSpan.FromMilliseconds(100)).Subscribe(n => buffer.Post(n));
buffer.AsObservable()
.Subscribe(n =>
{
Thread.Sleep(TimeSpan.FromMilliseconds(250));
Console.WriteLine(n);
});
Run Code Online (Sandbox Code Playgroud)
但感觉它应该可以直接在Rx中使用.这是最好的方法吗?
这是一个类似于Dave但使用的方法Sample
(比缓冲更合适).我在Dave的回答中添加了类似的扩展方法.
扩展名:
public static IDisposable SubscribeWithoutOverlap<T>(this IObservable<T> source, Action<T> action)
{
var sampler = new Subject<Unit>();
var sub = source.
Sample(sampler).
ObserveOn(Scheduler.ThreadPool).
Subscribe(l =>
{
action(l);
sampler.OnNext(Unit.Default);
});
// start sampling when we have a first value
source.Take(1).Subscribe(_ => sampler.OnNext(Unit.Default));
return sub;
}
Run Code Online (Sandbox Code Playgroud)
请注意,它更简单,并且没有触发的"空"缓冲区.发送给动作的第一个元素实际上来自流本身.
用法很简单:
messages.SubscribeWithoutOverlap(n =>
{
Console.WriteLine("start: " + n);
Thread.Sleep(500);
Console.WriteLine("end: " + n);
});
messages.Subscribe(x => Console.WriteLine("source: " + x)); // for testing
Run Code Online (Sandbox Code Playgroud)
结果:
source: 0
start: 0
source: 1
source: 2
source: 3
source: 4
source: 5
end: 0
start: 5
source: 6
source: 7
source: 8
source: 9
source: 10
end: 5
start: 10
source: 11
source: 12
source: 13
source: 14
source: 15
end: 10
Run Code Online (Sandbox Code Playgroud)
感谢 Lee Campbell(以Intro To Rx闻名),我现在有了一个使用此扩展方法的可行解决方案:
public static IObservable<T> ObserveLatestOn<T>(this IObservable<T> source, IScheduler scheduler)
{
return Observable.Create<T>(observer =>
{
Notification<T> outsideNotification = null;
var gate = new object();
bool active = false;
var cancelable = new MultipleAssignmentDisposable();
var disposable = source.Materialize().Subscribe(thisNotification =>
{
bool alreadyActive;
lock (gate)
{
alreadyActive = active;
active = true;
outsideNotification = thisNotification;
}
if (!alreadyActive)
{
cancelable.Disposable = scheduler.Schedule(self =>
{
Notification<T> localNotification = null;
lock (gate)
{
localNotification = outsideNotification;
outsideNotification = null;
}
localNotification.Accept(observer);
bool hasPendingNotification = false;
lock (gate)
{
hasPendingNotification = active = (outsideNotification != null);
}
if (hasPendingNotification)
{
self();
}
});
}
});
return new CompositeDisposable(disposable, cancelable);
});
}
Run Code Online (Sandbox Code Playgroud)