RxJava Observable"Iteration"如何工作?

tmn*_*tmn 10 java rx-java

我开始玩RxJava和ReactFX,我对它非常着迷.但是当我在试验时,我有几十个问题而且我一直在研究答案.

我正在观察的一件事(没有双关语意)当然是懒惰的执行.通过下面的探索性代码,我注意到在merge.subscribe(pet -> System.out.println(pet))被调用之前没有任何内容被执行.但令我着迷的是,当我订阅第二个订阅者时merge.subscribe(pet -> System.out.println("Feed " + pet)),它再次解雇了"迭代".

我想要了解的是迭代的行为.它似乎不像stream只能使用一次的Java 8 .它是逐字逐句地通过每String一个并将其作为该时刻的价值发布吗?任何以前被解雇的用户跟随任何新用户接收这些项目是否像新用户一样?

public class RxTest {

    public static void main(String[] args) {

        Observable<String> dogs = Observable.from(ImmutableList.of("Dasher", "Rex"))
                .filter(dog -> dog.matches("D.*"));

        Observable<String> cats = Observable.from(ImmutableList.of("Tabby", "Grumpy Cat", "Meowmers", "Peanut"));

        Observable<String> ferrets = Observable.from(CompletableFuture.supplyAsync(() -> "Harvey"));

        Observable<String> merge = dogs.mergeWith(cats).mergeWith(ferrets);

        merge.subscribe(pet -> System.out.println(pet));


        merge.subscribe(pet -> System.out.println("Feed " + pet));

    }
}
Run Code Online (Sandbox Code Playgroud)

MLP*_*CiM 10

Observable<T>代表一个monad,一个链式操作,而不是操作本身的执行.它是描述性语言,而不是您习惯的命令.要执行操作,请执行此操作.subscribe().每次订阅时,都会从头开始创建新的执行流.不要将流与线程混淆,因为订阅是同步执行的,除非您使用.subscribeOn()指定线程更改.observeOn().您将新元素链接到任何现有操作/ monad/Observable以添加新行为,例如更改线程,过滤,累积,转换等.如果您的observable是一项昂贵的操作,您不想在每个订阅上重复,您可以通过使用来防止娱乐.cache().

要将任何异步/同步Observable<T>操作转换为同步内联操作,请使用.toBlocking()将其类型更改为BlockingObservable<T>.而不是.subscribe()它包含用于对每个结果执行操作.forEach()或强制执行操作的新方法.first()

Observable是一个很好的工具,因为它们大多数是*确定性的(相同的输入总是产生相同的输出,除非你做错了),可重用(你可以将它们作为命令/策略模式的一部分发送)并且大多数情况下忽略同意,因为他们不应该依赖共享状态(也就是说做错了).如果你试图将一个基于可观察的库带入命令式语言,或者只是在一个Observable上执行一个你有100%信心并且管理良好的操作,那么BlockingObservables是很好的.

围绕这些原则构建应用程序是一种范式的改变,我无法真正涵盖这个答案.

*与强制性框架集成需要有类似SubjectObservable.create()需要的漏洞.

  • https://github.com/ReactiveX/RxJava/blob/86147542573004f4df84d2a2de83577cf62fe787/src/main/java/rx/internal/operators/OnSubscribeFromIterable.java#L92 (2认同)