如何将两个已排序的Observable合并为一个已排序的Observable?

Zhe*_*lov 16 java system.reactive rx-java

鉴于:

Integer[] arr1 = {1, 5, 9, 17};
Integer[] arr2 = {1, 2, 3, 6, 7, 12, 15};
Observable<Integer> o1 = Observable.from(arr1);
Observable<Integer> o2 = Observable.from(arr2);
Run Code Online (Sandbox Code Playgroud)

如何获得包含的Observable 1, 1, 2, 3, 5, 6, 7, 9, 12, 15, 17

Tim*_*lds 4

编辑:如果您要使用此功能,请参阅 the_joric 的评论。有一个边缘情况未处理,我没有找到快速修复它的方法,所以我现在没有时间修复它。

这是 C# 中的解决方案,因为您有标签system.reactive

static IObservable<int> MergeSorted(IObservable<int> a, IObservable<int> b)
{
    var source = Observable.Merge(
        a.Select(x => Tuple.Create('a', x)),
        b.Select(y => Tuple.Create('b', y)));
    return source.Publish(o =>
    {
        var published_a = o.Where(t => t.Item1 == 'a').Select(t => t.Item2);
        var published_b = o.Where(t => t.Item1 == 'b').Select(t => t.Item2);
        return Observable.Merge(
            published_a.Delay(x => published_b.FirstOrDefaultAsync(y => x <= y)),
            published_b.Delay(y => published_a.FirstOrDefaultAsync(x => y <= x)));
    });
}
Run Code Online (Sandbox Code Playgroud)

该想法总结如下。

  • a发出值时x,我们将其延迟,直到发出这样的b值。yx <= y

  • b发出值时y,我们将其延迟,直到发出这样的a值。xy <= x

如果您只有热可观察量,您可以执行以下操作。但如果混合中存在任何冷可观测值,则以下内容将不起作用。我建议始终使用适用于热和冷可观测值的版本。

static IObservable<int> MergeSortedHot(IObservable<int> a, IObservable<int> b)
{
    return Observable.Merge(
        a.Delay(x => b.FirstOrDefaultAsync(y => x <= y)),
        b.Delay(y => a.FirstOrDefaultAsync(x => y <= x)));
}
Run Code Online (Sandbox Code Playgroud)