tmn*_*tmn 5 java multithreading reactive-programming rx-java
我想使用reduce()observable上的操作将它映射到Guava ImmutableList,因为我更喜欢标准ArrayList.
Observable<String> strings = ...
Observable<ImmutableList<String>> captured = strings.reduce(ImmutableList.<String>builder(), (b,s) -> b.add(s))
.map(ImmutableList.Builder::build);
captured.forEach(i -> System.out.println(i));
Run Code Online (Sandbox Code Playgroud)
很简单.但是假设我在某个地方strings与多线程或其他东西并行地安排了observable .这不会破坏reduce()操作并可能导致竞争条件吗?特别是因为那ImmutableList.Builder会容易受到影响吗?
问题在于链的实现之间的共享状态.这是我博客中的陷阱#8 :
假设您对toList()运算符返回的List的性能或类型不满意,并且您想要滚动自己的聚合器而不是它.要进行更改,您希望通过使用现有运算符来执行此操作,并找到运算符reduce():
Observable<Vector<Integer>> list = Observable
.range(1, 3)
.reduce(new Vector<Integer>(), (vector, value) -> {
vector.add(value);
return vector;
});
list.subscribe(System.out::println);
list.subscribe(System.out::println);
list.subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)
当你运行'测试'调用时,第一个打印你期望的,但第二个打印一个向量,其中范围1-3出现两次,第三个订阅打印9个元素!
问题不在于reduce()运算符本身,而在于它周围的期望.建立链后,传入的新Vector是一个"全局"实例,将在链的所有评估之间共享.
当然,有一种方法可以解决这个问题而不需要为整个目的实现一个操作符(如果你看到前一个CounterOp的潜力,这应该很简单):
Observable<Vector<Integer>> list2 = Observable
.range(1, 3)
.reduce((Vector<Integer>)null, (vector, value) -> {
if (vector == null) {
vector = new Vector<>();
}
vector.add(value);
return vector;
});
list2.subscribe(System.out::println);
list2.subscribe(System.out::println);
list2.subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)
您需要从null开始并在累加器函数内创建一个向量,该向量现在不在订阅者之间共享.
或者,您可以查看collect()具有初始值的工厂回调的运算符.
这里的经验法则是,每当您看到类似聚合器的运营商采取一些简单的价值时,请谨慎,因为这个"初始值"很可能会在所有订户之间共享,如果您计划使用多个订阅者来使用生成的流,会发生冲突,可能会给你意想不到的结果甚至崩溃.