Ale*_*lex 30 c# system.reactive
我想有效地限制事件流,以便在收到第一个事件时调用我的委托,但如果收到后续事件则不会持续1秒.在超时(1秒)到期后,如果收到后续事件,我希望调用我的代理.
使用Reactive Extensions有一种简单的方法吗?
示例代码:
static void Main(string[] args)
{
Console.WriteLine("Running...");
var generator = Observable
.GenerateWithTime(1, x => x <= 100, x => x, x => TimeSpan.FromMilliseconds(1), x => x + 1)
.Timestamp();
var builder = new StringBuilder();
generator
.Sample(TimeSpan.FromSeconds(1))
.Finally(() => Console.WriteLine(builder.ToString()))
.Subscribe(feed =>
builder.AppendLine(string.Format("Observed {0:000}, generated at {1}, observed at {2}",
feed.Value,
feed.Timestamp.ToString("mm:ss.fff"),
DateTime.Now.ToString("mm:ss.fff"))));
Console.ReadKey();
}
Run Code Online (Sandbox Code Playgroud)
当前输出:
Running...
Observed 064, generated at 41:43.602, observed at 41:43.602
Observed 100, generated at 41:44.165, observed at 41:44.602
Run Code Online (Sandbox Code Playgroud)
但我想观察(时间戳显然会改变)
Running...
Observed 001, generated at 41:43.602, observed at 41:43.602
....
Observed 100, generated at 41:44.165, observed at 41:44.602
Run Code Online (Sandbox Code Playgroud)
cRi*_*ter 15
好的,
你有3个场景:
1)我想每秒获得一个事件流的值.意味着:如果它每秒产生更多事件,你将得到一个总是更大的缓冲区.
observableStream.Throttle(timeSpan)
Run Code Online (Sandbox Code Playgroud)
2)我想得到最新的事件,这是在第二次发生之前产生的意思:其他事件被丢弃.
observableStream.Sample(TimeSpan.FromSeconds(1))
Run Code Online (Sandbox Code Playgroud)
3)你想得到所有事件,发生在最后一秒.那一秒钟
observableStream.BufferWithTime(timeSpan)
Run Code Online (Sandbox Code Playgroud)
4)你想要选择在第二个与所有值之间发生的事情,直到第二个已经过去,并返回你的结果
observableStream.CombineLatest(Observable.Interval(1000), selectorOnEachEvent)
Run Code Online (Sandbox Code Playgroud)
Ser*_*hov 13
这是我在RX论坛的帮助下得到的:
我们的想法是为原始序列发布一系列"门票".这些"门票"在超时时间延迟,不包括第一个"门票",该门票立即预先加到门票序列中.当一个事件进来并且有一个等待的票时,事件立即触发,否则它等到票然后开火.当它发射时,会发出下一张票,依此类推......
要结合票证和原始事件,我们需要一个组合器.不幸的是,"标准".CombineLatest不能在这里使用,因为它会触发之前使用过的门票和事件.因此,我必须创建自己的组合器,它基本上是一个过滤的.CombineLatest,仅当组合中的两个元素都是"新鲜"时才会触发 - 之前从未返回过.我叫它.CombineVeryLatest又名.BrokenZip;)
使用.CombineVeryLatest,上面的想法可以这样实现:
public static IObservable<T> SampleResponsive<T>(
this IObservable<T> source, TimeSpan delay)
{
return source.Publish(src =>
{
var fire = new Subject<T>();
var whenCanFire = fire
.Select(u => new Unit())
.Delay(delay)
.StartWith(new Unit());
var subscription = src
.CombineVeryLatest(whenCanFire, (x, flag) => x)
.Subscribe(fire);
return fire.Finally(subscription.Dispose);
});
}
public static IObservable<TResult> CombineVeryLatest
<TLeft, TRight, TResult>(this IObservable<TLeft> leftSource,
IObservable<TRight> rightSource, Func<TLeft, TRight, TResult> selector)
{
var ls = leftSource.Select(x => new Used<TLeft>(x));
var rs = rightSource.Select(x => new Used<TRight>(x));
var cmb = ls.CombineLatest(rs, (x, y) => new { x, y });
var fltCmb = cmb
.Where(a => !(a.x.IsUsed || a.y.IsUsed))
.Do(a => { a.x.IsUsed = true; a.y.IsUsed = true; });
return fltCmb.Select(a => selector(a.x.Value, a.y.Value));
}
private class Used<T>
{
internal T Value { get; private set; }
internal bool IsUsed { get; set; }
internal Used(T value)
{
Value = value;
}
}
Run Code Online (Sandbox Code Playgroud)
编辑:这是AndreasKöpf在论坛上提出的另一个更紧凑的CombineVeryLatest变体:
public static IObservable<TResult> CombineVeryLatest
<TLeft, TRight, TResult>(this IObservable<TLeft> leftSource,
IObservable<TRight> rightSource, Func<TLeft, TRight, TResult> selector)
{
return Observable.Defer(() =>
{
int l = -1, r = -1;
return Observable.CombineLatest(
leftSource.Select(Tuple.Create<TLeft, int>),
rightSource.Select(Tuple.Create<TRight, int>),
(x, y) => new { x, y })
.Where(t => t.x.Item2 != l && t.y.Item2 != r)
.Do(t => { l = t.x.Item2; r = t.y.Item2; })
.Select(t => selector(t.x.Item1, t.y.Item1));
});
}
Run Code Online (Sandbox Code Playgroud)
我昨晚在同样的问题上苦苦挣扎,相信我找到了一个更优雅(或至少更短)的解决方案:
var delay = Observable.Empty<T>().Delay(TimeSpan.FromSeconds(1));
var throttledSource = source.Take(1).Concat(delay).Repeat();
Run Code Online (Sandbox Code Playgroud)
这是我在Rx论坛中发布的这个问题的答案:
更新:这是一个新版本,当事件发生时间差超过一秒时,不再延迟事件转发:
public static IObservable<T> ThrottleResponsive3<T>(this IObservable<T> source, TimeSpan minInterval)
{
return Observable.CreateWithDisposable<T>(o =>
{
object gate = new Object();
Notification<T> last = null, lastNonTerminal = null;
DateTime referenceTime = DateTime.UtcNow - minInterval;
var delayedReplay = new MutableDisposable();
return new CompositeDisposable(source.Materialize().Subscribe(x =>
{
lock (gate)
{
var elapsed = DateTime.UtcNow - referenceTime;
if (elapsed >= minInterval && delayedReplay.Disposable == null)
{
referenceTime = DateTime.UtcNow;
x.Accept(o);
}
else
{
if (x.Kind == NotificationKind.OnNext)
lastNonTerminal = x;
last = x;
if (delayedReplay.Disposable == null)
{
delayedReplay.Disposable = Scheduler.ThreadPool.Schedule(() =>
{
lock (gate)
{
referenceTime = DateTime.UtcNow;
if (lastNonTerminal != null && lastNonTerminal != last)
lastNonTerminal.Accept(o);
last.Accept(o);
last = lastNonTerminal = null;
delayedReplay.Disposable = null;
}
}, minInterval - elapsed);
}
}
}
}), delayedReplay);
});
}
Run Code Online (Sandbox Code Playgroud)
这是我之前的尝试:
var source = Observable.GenerateWithTime(1,
x => x <= 100, x => x, x => TimeSpan.FromMilliseconds(1), x => x + 1)
.Timestamp();
source.Publish(o =>
o.Take(1).Merge(o.Skip(1).Sample(TimeSpan.FromSeconds(1)))
).Run(x => Console.WriteLine(x));
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
16228 次 |
最近记录: |