Zhe*_*lov 16 java system.reactive rx-java
鉴于:
Integer[] arr1 = {1, 5, 9, 17};
Integer[] arr2 = {1, 2, 3, 6, 7, 12, 15};
Observable<Integer> o1 = Observable.from(arr1);
Observable<Integer> o2 = Observable.from(arr2);
Run Code Online (Sandbox Code Playgroud)
如何获得包含的Observable 1, 1, 2, 3, 5, 6, 7, 9, 12, 15, 17?
编辑:如果您要使用此功能,请参阅 the_joric 的评论。有一个边缘情况未处理,我没有找到快速修复它的方法,所以我现在没有时间修复它。
这是 C# 中的解决方案,因为您有标签system.reactive。
static IObservable<int> MergeSorted(IObservable<int> a, IObservable<int> b)
{
var source = Observable.Merge(
a.Select(x => Tuple.Create('a', x)),
b.Select(y => Tuple.Create('b', y)));
return source.Publish(o =>
{
var published_a = o.Where(t => t.Item1 == 'a').Select(t => t.Item2);
var published_b = o.Where(t => t.Item1 == 'b').Select(t => t.Item2);
return Observable.Merge(
published_a.Delay(x => published_b.FirstOrDefaultAsync(y => x <= y)),
published_b.Delay(y => published_a.FirstOrDefaultAsync(x => y <= x)));
});
}
Run Code Online (Sandbox Code Playgroud)
该想法总结如下。
当a发出值时x,我们将其延迟,直到发出这样的b值。yx <= y
当b发出值时y,我们将其延迟,直到发出这样的a值。xy <= x
如果您只有热可观察量,您可以执行以下操作。但如果混合中存在任何冷可观测值,则以下内容将不起作用。我建议始终使用适用于热和冷可观测值的版本。
static IObservable<int> MergeSortedHot(IObservable<int> a, IObservable<int> b)
{
return Observable.Merge(
a.Delay(x => b.FirstOrDefaultAsync(y => x <= y)),
b.Delay(y => a.FirstOrDefaultAsync(x => y <= x)));
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2706 次 |
| 最近记录: |