Vek*_*ksi 6 c# events reactive-programming system.reactive
我进入了一个Rx狂欢,可以这么说,这个问题与我这里和这里有关.然而,也许这些对某些人有帮助,因为我可以将它们视为同一主题的有用变体.
问题:如何将一组随机流int(例如,在随机间隔上产生的间隔[0,10])对象分组并为earch组提供可变数量的事件警报(由于缺乏更好的定义,进一步的背景见链接的帖子).更具体地说,使用代码,如何在下面定义每个组的多重节流设置:
var idAlarmStream = idStream
.Select(i => i)
.GroupByUntil(key => key.Id, grp => grp.Throttle(Timespan.FromMilliseconds(1000))
.SelectMany(grp => grp.TakeLast(1))
.Subscribe(i => Console.WriteLine(i));
Run Code Online (Sandbox Code Playgroud)
如果每组缺少多于一秒的ID,则将调用subscribe函数.如果想要为没有事件(例如,一秒,五秒和十秒)定义三个不同的值并且在事件到达时全部取消,该怎么办?我能想到的是:
idStream分成几个合成ID,并提供真实ID和合成ID之间的双射映射.例如,在这种情况下ID:1 - > 100,101,102; ID:2 - > 200,201,203和然后定义一个选择器功能在Throttle像这样Func<int, Timespan>(i => /* switch(i)...*/),然后当Subscribe将被调用,映射ID背面.有关更多背景信息,另请参阅相关问题.在一个更一般的设置中,我怀疑,这是一种情况,每个组有多个处理程序,虽然我没有设法找到任何与此相关的东西.
<编辑:
作为一个(希望澄清)一个例子idStream推送一个ID:1,将启动三个不同的计数器,每个计数器等待下一个事件发生或如果没有及时检测到新的ID 1则报警.计数器1(C1)等待5秒钟,计数器2(C2)等待7秒钟,计数器3(C3)等待10秒钟.如果在[0,5]秒的时间内收到新的ID 1,则所有计数器将使用上述值重新初始化,并且不会发送任何警报.如果在[0,7]秒间隔内收到新ID,则会重新初始化C1报警和C2和C3.类似地,如果在区间[0,10]内接收到新的ID,则C1和C2会激活,但C3才会重新初始化.
也就是说,在给定某些条件的情况下,将存在多个"缺席警报"或一般而言,针对一个ID采取的动作.我不确定什么是好的模拟......也许在塔中叠加"警示灯",这样首先是绿色,然后是黄色,最后是红色.由于ID的缺失持续时间越来越长,颜色之后的颜色将被点亮(在这种情况下,红色是最后一个).然后,当检测到一个ID时,所有灯都将关闭.
<编辑2:
在将詹姆斯的代码改装为示例如下并将其余部分保留为书面时,我发现Subscribe将在两个警报级别的第一个事件上直接调用.
const int MaxLevels = 2;
var idAlarmStream = idStream
.Select(i => i)
.AlarmSystem(keySelector, thresholdSelector, MaxLevels, TaskPoolScheduler.Default)
.Subscribe(i =>
{
Debug.WriteLine("Alarm on id \"{0}\" at {1}", i, DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff", CultureInfo.InvariantCulture));
});
Run Code Online (Sandbox Code Playgroud)
让我们看看这里发生了什么,如果MaxLevels可以动态提供......
<编辑3:詹姆斯的代码有效.问题出在椅子和键盘之间!将时间改为更合理的确定确实有帮助.事实上,我把它们改成了更大的数字,但事实.FromTicks并没有让我逃脱了几分钟.
我认为这是有效的 - 我将在稍后尝试添加更全面的解释.每个警报级别都有一个定义的阈值(每个信号组).这些预计会持续增加.
基本思想是让所有先前级别的信号进入当前级别.第一级是信号本身的"零"级别,在返回警报流之前将其过滤掉.请注意,TSignal密钥需要支持值标识.
我确信有简化的余地!
样品单元测试:
public class AlarmTests : ReactiveTest
{
[Test]
public void MultipleKeyMultipleSignalMultipleLevelTest()
{
var threshold1 = TimeSpan.FromTicks(300);
var threshold2 = TimeSpan.FromTicks(800);
var scheduler = new TestScheduler();
var signals = scheduler.CreateHotObservable(
OnNext(200, 1),
OnNext(200, 2),
OnNext(400, 1),
OnNext(420, 2),
OnNext(800, 1),
OnNext(1000, 1),
OnNext(1200, 1));
Func<int, int> keySelector = i => i;
Func<int, int, TimeSpan> thresholdSelector = (key, level) =>
{
if (level == 1) return threshold1;
if (level == 2) return threshold2;
return TimeSpan.MaxValue;
};
var results = scheduler.CreateObserver<Alarm<int>>();
signals.AlarmSystem(
keySelector,
thresholdSelector,
2,
scheduler).Subscribe(results);
scheduler.Start();
results.Messages.AssertEqual(
OnNext(700, new Alarm<int>(1, 1)),
OnNext(720, new Alarm<int>(2, 1)),
OnNext(1220, new Alarm<int>(2, 2)),
OnNext(1500, new Alarm<int>(1, 1)),
OnNext(2000, new Alarm<int>(1, 2)));
}
[Test]
public void CheckAlarmIsSuppressed()
{
var threshold1 = TimeSpan.FromTicks(300);
var threshold2 = TimeSpan.FromTicks(500);
var scheduler = new TestScheduler();
var signals = scheduler.CreateHotObservable(
OnNext(200, 1),
OnNext(400, 1),
OnNext(600, 1));
Func<int, int> keySelector = i => i;
Func<int, int, TimeSpan> thresholdSelector = (signal, level) =>
{
if (level == 1) return threshold1;
if (level == 2) return threshold2;
return TimeSpan.MaxValue;
};
var results = scheduler.CreateObserver<Alarm<int>>();
signals.AlarmSystem(
keySelector,
thresholdSelector,
2,
scheduler).Subscribe(results);
scheduler.Start();
results.Messages.AssertEqual(
OnNext(900, new Alarm<int>(1, 1)),
OnNext(1100, new Alarm<int>(1, 2)));
}
}
public static class ObservableExtensions
{
/// <summary>
/// Create an alarm system that detects signal gaps of length
/// determined by a signal key and signals alarms of increasing severity.
/// </summary>
/// <typeparam name="TSignal">Type of the signal</typeparam>
/// <typeparam name="TKey">Type of the signal key used for grouping, must implement Equals correctly</typeparam>
/// <param name="signals">Input signal stream</param>
/// <param name="keySelector">Function to select a key from a signal for grouping</param>
/// <param name="thresholdSelector">Function to select a threshold for a given signal key and alarm level.
/// Should return TimeSpan.MaxValue for levels above the highest level</param>
/// <param name="levels">Number of alarm levels</param>
/// <param name="scheduler">Scheduler use for throttling</param>
/// <returns>A stream of alarms each of which contains the signal and alarm level</returns>
public static IObservable<Alarm<TSignal>> AlarmSystem<TSignal, TKey>(
this IObservable<TSignal> signals,
Func<TSignal, TKey> keySelector,
Func<TKey, int, TimeSpan> thresholdSelector,
int levels,
IScheduler scheduler)
{
var alarmSignals = signals.Select(signal => new Alarm<TSignal>(signal, 0))
.Publish()
.RefCount();
for (int i = 0; i < levels; i++)
{
alarmSignals = alarmSignals.CreateAlarmSystemLevel(
keySelector, thresholdSelector, i + 1, scheduler);
}
return alarmSignals.Where(alarm => alarm.Level != 0);
}
private static IObservable<Alarm<TSignal>> CreateAlarmSystemLevel<TSignal, TKey>(
this IObservable<Alarm<TSignal>> alarmSignals,
Func<TSignal, TKey> keySelector,
Func<TKey, int, TimeSpan> thresholdSelector,
int level,
IScheduler scheduler)
{
return alarmSignals
.Where(alarmSignal => alarmSignal.Level == 0)
.Select(alarmSignal => alarmSignal.Signal)
.GroupByUntil(
keySelector,
grp => grp.Throttle(thresholdSelector(grp.Key, level), scheduler))
.SelectMany(grp => grp.TakeLast(1).Select(signal => new Alarm<TSignal>(signal, level)))
.Merge(alarmSignals);
}
}
public class Alarm<TSignal> : IEquatable<Alarm<TSignal>>
{
public Alarm(TSignal signal, int level)
{
Signal = signal;
Level = level;
}
public TSignal Signal { get; private set; }
public int Level { get; private set; }
private static bool Equals(Alarm<TSignal> x, Alarm<TSignal> y)
{
if (ReferenceEquals(x, null))
return false;
if (ReferenceEquals(y, null))
return false;
if (ReferenceEquals(x, y))
return true;
return x.Signal.Equals(y.Signal) && x.Level.Equals(y.Level);
}
// Equality implementation added to help with testing.
public override bool Equals(object other)
{
return Equals(this, other as Alarm<TSignal>);
}
public override string ToString()
{
return string.Format("Signal: {0} Level: {1}", Signal, Level);
}
public bool Equals(Alarm<TSignal> other)
{
return Equals(this, other);
}
public static bool operator ==(Alarm<TSignal> x, Alarm<TSignal> y)
{
return Equals(x, y);
}
public static bool operator !=(Alarm<TSignal> x, Alarm<TSignal> y)
{
return !Equals(x, y);
}
public override int GetHashCode()
{
return ((Signal.GetHashCode()*37) ^ Level.GetHashCode()*329);
}
}
Run Code Online (Sandbox Code Playgroud)