Java 8流和RxJava可观察量之间的差异

rah*_*lrv 137 observable java-8 rx-java java-stream

Java 8流是否类似于RxJava observables?

Java 8流定义:

java.util.stream包中的类提供Stream API以支持对元素流的功能样式操作.

Kir*_*kov 142

TL; DR:所有序列/流处理库都为管道构建提供了非常相似的API.不同之处在于用于处理多线程和管道组合的API.

RxJava与Stream完全不同.在所有JDK事物中,最接近rx.Observable的可能是java.util.stream.Collector Stream + CompletableFuture组合(其代价是处理额外的monad层,即必须处理Stream<CompletableFuture<T>>和之间的转换CompletableFuture<Stream<T>>).

Observable和Stream之间存在显着差异:

  • Streams是基于拉式的,Observables是基于推送的.这可能听起来过于抽象,但它具有非常具体的重大后果.
  • 流只能使用一次,Observable可以多次订阅
  • Stream#parallel()将序列拆分为分区,Observable#subscribeOn()Observable#observeOn()不是; Stream#parallel()使用Observable 模拟行为是很棘手的,它曾经有.parallel()方法,但是这个方法引起了很多混乱,.parallel()支持被移动到github上的独立存储库RxJavaParallel.更多细节在另一个答案中.
  • Stream#parallel()不允许指定要使用的线程池,这与大多数接受可选Scheduler的RxJava方法不同.由于JVM中的所有流实例使用相同的fork-join池,因此添加.parallel()可能会意外地影响程序的另一个模块中的行为
  • Streams缺乏与时间相关的操作Observable#interval(),Observable#window()以及许多其他操作; 这主要是因为Streams是基于拉力的
  • 与RxJava相比,Streams提供有限的操作集.例如Streams缺乏截止操作(takeWhile(),takeUntil()); 解决方法使用Stream#anyMatch()受到限制:它是终端操作,因此每个流不能多次使用它
  • 从JDK 8开始,没有Stream#zip操作,这有时非常有用
  • Streams很难自己构建,Observable可以通过多种方式构建 EDIT:如评论中所述,有一些方法可以构建Stream.但是,由于没有非终端短路,你不能轻易地在文件中生成线条流(JDK提供开箱即用的文件#行和BufferedReader#行,其他类似的场景可以通过构建流来管理来自Iterator).
  • Observable提供资源管理设施(Observable#using()); 你可以用它包装IO流或互斥,并确保用户不会忘记释放资源 - 它将在订阅终止时自动处理; Stream有onClose(Runnable)方法,但您必须手动或通过try-with-resources调用它.E. g.你必须记住,Files#lines()必须包含在try-with-resources块中.
  • Observable一直在同步(我实际上没有检查Streams是否也是如此).这使您不必考虑基本操作是否是线程安全的(答案总是"是",除非有错误),但无论您的代码是否需要,并发相关的开销都将存在.

综述:RxJava与Streams显着不同.Real RxJava替代品是ReactiveStreams的其他实现,例如Akka的相关部分.

更新.使用非默认的fork-join池有诀窍Stream#parallel,请参阅Java 8并行流中的自定义线程池

更新.以上所有内容均基于RxJava 1.x的经验.既然RxJava 2.x在这里,这个答案可能已经过时了.

  • 为什么Streams难以构建?根据这篇文章,它似乎很容易:http://www.oracle.com/technetwork/articles/java/ma14-java-se-8-streams-2177646.html (2认同)
  • 有很多类都有'stream'方法:集合,输入流,目录文件等.但是如果你想从自定义循环创建一个流 - 比如迭代数据库游标呢?到目前为止,我发现的最好的方法是创建一个Iterator,用Spliterator包装它,最后调用StreamSupport#fromSpliterator.对于一个简单的案例恕我直言的太多胶水.还有Stream.iterate,但它产生无限的流.在这种情况下切断sream的唯一方法是Stream#anyMatch,但它是一个终端操作,因此你不能将流生产者和消费者分开 (2认同)
  • RxJava有Observable.fromCallable,Observable.create等等.或者你可以安全地产生无限的Observable,然后说'.takeWhile(condition)',你可以将这个序列传递给消费者 (2认同)

dwu*_*sen 48

Java 8 Stream和RxJava看起来非常相似.它们具有相似的运算符(filter,map,flatMap ...),但不是为相同的用法而构建的.

您可以使用RxJava执行asynchonus任务.

使用Java 8流,您将遍历集合中的项目.

你可以在RxJava中做同样的事情(遍历集合的项目),但是,由于RxJava专注于并发任务,...,它使用同步,锁存,......所以使用RxJava的相同任务可能比用Java 8流.

可以比较RxJava CompletableFuture,但是它可以计算多于一个值.

  • 值得注意的是,关于流遍历的说法仅适用于非并行流.`parallelStream`支持简单遍历/映射/过滤等的类似同步. (12认同)
  • @ marcin-koziński你可以查看这个基准:https://twitter.com/akarnokd/status/752465265091309568 (6认同)
  • 我不认为"所以使用RxJava的相同任务可能比使用Java 8流更慢." 普遍存在,它严重依赖于手头的任务. (2认同)

aka*_*okd 35

存在一些技术和概念上的差异,例如,Java 8流是单用的,基于拉的,同步的值序列,而RxJava Observable是可重新观察的,自适应地基于推挽的,可能是异步的值序列.RxJava针对Java 6+,也适用于Android.

  • 涉及RxJava的典型代码大量使用lambda,它只能从Java 8开始使用.所以你可以在Java 6中使用Rx,但代码会很吵 (4认同)
  • @KirillGamazkov你可以使用[retrolambda](https://github.com/orfjackal/retrolambda)使你的代码在面向Java 6时更漂亮. (2认同)

Bar*_*ter 29

Java 8 Streams是基于拉取的.您遍历消耗每个项目的Java 8流.它可能是一个无穷无尽的流.

RXJava Observable默认是基于推送的.您订阅了一个Observable,当下一个项目到达(onNext),或者流完成(onCompleted)或发生错误(onError)时,您将收到通知.因为Observable你收到onNext,onCompleted,onError事件,你可以做一些强大的功能,如不同的组合Observables到一个新的(zip,merge,concat).你可以做的其他事情是缓存,限制......它在不同语言中使用或多或少相同的API(RxJava,C#中的RX,RxJS,......)

默认情况下,RxJava是单线程的.除非您开始使用Scheduler,否则一切都将在同一个线程上发生.


Ada*_*hes 25

现有的答案是全面和正确的,但缺乏明确的初学者榜样.请允许我在"推/拉"和"可重新观察"等术语背后加上一些具体内容. 注意:我讨厌这个术语Observable(这是天堂般的流),所以只是简单地引用J8和RX流.

考虑一个整数列表,

digits = [1,2,3,4,5]
Run Code Online (Sandbox Code Playgroud)

J8 Stream是一个用于修改集合的实用程序.例如偶数可以提取为,

evens = digits.stream().filter(x -> x%2).collect(Collectors.toList())
Run Code Online (Sandbox Code Playgroud)

这基本上是Python的map,filter,reduce,一个非常好的(并且早就应该)加入Java.但是如果没有提前收集数字怎么办 - 如果数字在应用程序运行时流入,那么我们可以实时过滤偶数.

想象一下,一个单独的线程进程在app运行时随机输出整数(---表示时间)

digits = 12345---6------7--8--9-10--------11--12
Run Code Online (Sandbox Code Playgroud)

在RX中,even可以每个新数字作出反应并实时应用滤波器

even = -2-4-----6---------8----10------------12
Run Code Online (Sandbox Code Playgroud)

无需存储输入和输出列表.如果你想要一个输出列表,也没有可流动的问题.事实上,一切都是流.

evens_stored = even.collect()  
Run Code Online (Sandbox Code Playgroud)

这就是"无状态"和"功能"等术语与RX更相关的原因