chi*_*tom 9 c# reactive-programming system.reactive
我想使用Reactive Extensions转换一些消息,并在一小段延迟后转发它们.
消息看起来像这样:
class InMsg
{
int GroupId { get; set; }
int Delay { get; set; }
string Content { get; set; }
}
Run Code Online (Sandbox Code Playgroud)
输出看起来像这样:
class OutMsg
{
int GroupId { get; set; }
string Content { get; set; }
OutMsg(InMsg in)
{
GroupId = in.GroupId;
Content = Transform(in.Content); // function omitted
}
}
Run Code Online (Sandbox Code Playgroud)
有几个要求:
给定一个Observable <InMsg>和一个Send函数:
IObservable<InMsg> inMsgs = ...;
void Send(OutMsg o)
{
... // publishes transformed messages
}
Run Code Online (Sandbox Code Playgroud)
我知道我可以使用Select来执行转换.
void SetUp()
{
inMsgs.Select(i => new OutMsg(i)).Subscribe(Send);
}
Run Code Online (Sandbox Code Playgroud)
您可以使用GroupBy来制作IGroupedObservable,Delay延迟输出,并Switch确保较新的值替换其组中的先前值:
IObservable<InMsg> inMessages;
inMessages
.GroupBy(msg => msg.GroupId)
.Select(group =>
{
return group.Select(groupMsg =>
{
TimeSpan delay = TimeSpan.FromMilliseconds(groupMsg.Delay);
OutMsg outMsg = new OutMsg(); // map InMsg -> OutMsg here
return Observable.Return(outMsg).Delay(delay);
})
.Switch();
})
.Subscribe(outMsg => Console.Write("OutMsg received"));
Run Code Online (Sandbox Code Playgroud)
关于实现的说明:如果在发送消息后(即延迟之后)到达分组值,则会启动新的延迟
| 归档时间: |
|
| 查看次数: |
2512 次 |
| 最近记录: |