Observable.Join 示例

NKn*_*rer 1 c# system.reactive

有人可以提供一个如何将 Observable.Join 与两种不同的可观察类型一起使用的示例吗?到目前为止,
我在这里找到的最好的解释是,左边的可观察对象打开一个窗口,而右边的可观察对象用于在该窗口中查找匹配项,这是有意义的。但这是如何运作的呢?该示例只是使用,我不明白它的作用。对我来说,持续时间选择器听起来像是一个时间跨度,例如缓冲区窗口。leftDurationSelectorrightDurationSelectorPublish().RefCount()

Eni*_*ity 5

我认为这是一个很好地展示了发生了什么的例子。

让我们首先生成两个带时间戳的可观测值sourceAsourceB每个观测值以010.0秒之间的随机间隔生成一个值:

Random rnd = new Random();
IObservable<int> source =
    Observable
        .Generate(0, x => true, x => x + 1, x => x,
            x => TimeSpan.FromSeconds(rnd.NextDouble()* 10.0));

IObservable<Timestamped<string>> sourceA = source.Select(x => $"A{x}").Timestamp();
IObservable<Timestamped<string>> sourceB = source.Select(x => $"B{x}").Timestamp();
Run Code Online (Sandbox Code Playgroud)

让我们有一个很好的方法来输出值:

Func<Timestamped<string>, string> format =
    x => $"{x.Value} @ {x.Timestamp.LocalDateTime.ToString("HH:mm:ss.f")}";
Run Code Online (Sandbox Code Playgroud)

例如,调用format(x)可能会返回"A1 @ 12:33:09.2"

现在加入:

IObservable<string> query =
    sourceA.Join(sourceB,
        a => Observable.Timer(TimeSpan.FromSeconds(4.0)),
        b => Observable.Timer(TimeSpan.FromSeconds(4.0)),
        (at, bt) => $"{format(at)}, {format(bt)}");
Run Code Online (Sandbox Code Playgroud)

如果我订阅此查询,我会得到如下结果:

A1 @ 12:33:09.2, B0 @ 12:33:12.4
A3 @ 12:33:23.8,B1 @ 12:33:20.9
A3 @ 12:33:23.8,B2 @ 12:33:25.6
A3 @ 12:33:23.8,B3 @ 12:33:25.9
A4 @ 12:33:30.6,B4 @ 12:33:33.0
A5 @ 12:33:37.9,B5 @ 12:33:35.7
A5 @ 12:33:37.9,B6 @ 12:33:40.3
A5 @ 12:33:37.9,B7 @ 12:33:40.8
A6 @ 12:33:43.3,B6 @ 12:33:40.3
A6 @ 12:33:43.3,B7 @ 12:33:40.8
A7 @ 12:33:44.5,B7 @ 12:33:40.8
A7 @ 12:33:44.5,B8 @ 12:33:44.9
A6 @ 12:33:43.3,B8 @ 12:33:44.9

sourceA我从&获得配对结果,其中任一值在另一个值的几秒sourceB内生成。4.0

现在,如果我只想要一个值,sourceB只要它仅4.0在值 from 后几秒生成sourceA,而不是相反呢?

IObservable<string> query =
    sourceA.Join(sourceB,
        a => Observable.Timer(TimeSpan.FromSeconds(4.0)),
        b => Observable.Timer(TimeSpan.FromSeconds(0.0)),
        (at, bt) => $"{format(at)}, {format(bt)}");
Run Code Online (Sandbox Code Playgroud)

给我:

A0 @ 12:41:35.4,B1 @ 12:41:35.5
A1 @ 12:41:42.7,B2 @ 12:41:43.2
A3 @ 12:41:47.6,B3 @ 12:41:51.0
A4 @ 12:41:49.8,B3 @ 12:41:51.0
A7 @ 12:42:00.2,B4 @ 12:42:00.4
A7 @ 12:42:00.2,B5 @ 12:42:02.5
A7 @ 12:42:00.2,B6 @ 12:42:03.5
A8 @ 12:42:04.8, B7 @ 12:42:06.4
A9 @ 12:42:12.3,B8 @ 12:42:15.2
A10 @ 12:42:17.2,B9 @ 12:42:19.7
A11 @ 12:42:19.4,B9 @ 12:42:19.7

请注意,所有“B”都出现在“A”之后。

或者我可以用相反的方式来做——其中的值是在 后但在几秒钟内sourceA生成的。sourceB4.0

IObservable<string> query =
    sourceA.Join(sourceB,
        a => Observable.Timer(TimeSpan.FromSeconds(0.0)),
        b => Observable.Timer(TimeSpan.FromSeconds(4.0)),
        (at, bt) => $"{format(at)}, {format(bt)}");
Run Code Online (Sandbox Code Playgroud)

这给了我:

A1 @ 12:43:23.8,B0 @ 12:43:21.4
A1 @ 12:43:23.8,B1 @ 12:43:22.8
A2 @ 12:43:27.9,B2 @ 12:43:27.3
A3 @ 12:43:33.6,B3 @ 12:43:29.6
A4 @ 12:43:36.2,B4 @ 12:43:35.3
A4 @ 12:43:36.2,B5 @ 12:43:35.5
A4 @ 12:43:36.2,B6 @ 12:43:35.9
A5 @ 12:43:43.4,B9 @ 12:43:43.1
A5 @ 12:43:43.4,B8 @ 12:43:40.6
A6 @ 12:43:46.5,B10 @ 12:43:43.8
A6 @ 12:43:46.5,B9 @ 12:43:43.1
A8 @ 12:43:55.5,B12 @ 12:43:52.4
A9 @ 12:44:03.8,B13 @ 12:44:01.2
A12 @ 12:44:09.7, B14 @ 12:44:08.9
A13 @ 12:44:12.8, B14 @ 12:44:08.9
A14 @ 12:44:16.0, B15 @ 12:44:13.3

请注意,所有“A”都出现在“B”之后。

您还可以使用查询语法:

var query =
    from a in sourceA
    join b in sourceB
        on Observable.Timer(TimeSpan.FromSeconds(4.0))
        equals Observable.Timer(TimeSpan.FromSeconds(4.0))
    select $"{format(a)}, {format(b)}";
Run Code Online (Sandbox Code Playgroud)