我观看了视频,我知道了一般原则 - 即使没有人订阅也会发生热情,冷却发生在"按需".此外,Publish()将冷转换为热,Defer()将冷转换为冷.
但是,我觉得我错过了细节.以下是我想回答的一些问题:
我正在尝试重新排序在不同线程上无序到达的事件.
是否可以创建与这些大理石图匹配的反应式扩展查询:
s1 1 2 3 4
s2 1 3 2 4
result 1 2 3 4
Run Code Online (Sandbox Code Playgroud)
和...
s1 1 2 3 4
s2 4 3 2 1
result 1234
Run Code Online (Sandbox Code Playgroud)
即:仅以版本号顺序发布结果.
我最接近的是每次s1滴答时使用Join打开一个窗口,只有当s2到达时才使用相同的数字关闭它.
像这样:
var publishedEvents = events.Publish().RefCount();
publishedEvents.Join(
publishedEvents.Scan(0, (i, o) => i + 1),
expectedVersion => publishedEvents.Any(@event => @event.Version == expectedVersion),
_ => Observable.Never<Unit>(),
(@event, expectedVersion) => new {@event,expectedVersion})
.Where(x => x.expectedVersion == x.@event.Version)
.Select(x => x.@event)
.Subscribe(Persist);
Run Code Online (Sandbox Code Playgroud)
但是这对于图表2不起作用.第2组将在s2标记数字2时完成,因此在1之前完成.
是否有意义?可以用Rx完成吗?应该是?
编辑:我想这就像重叠的窗口,后面的窗口在所有前面的窗口关闭之前无法关闭.并且在窗口编号与事件版本号匹配之前,前面的窗口不会关闭.
编辑2:
我现在有这样的东西,但它不是真正的反应性,功能性,线程安全的LINQ启示,我希望(请忽略我的事件现在是JObjects):
var orderedEvents = Observable.Create<JObject>(observer …Run Code Online (Sandbox Code Playgroud)