rah*_*lrv 137 observable java-8 rx-java java-stream
Java 8流是否类似于RxJava observables?
Java 8流定义:
新
java.util.stream包中的类提供Stream API以支持对元素流的功能样式操作.
Kir*_*kov 142
TL; DR:所有序列/流处理库都为管道构建提供了非常相似的API.不同之处在于用于处理多线程和管道组合的API.
RxJava与Stream完全不同.在所有JDK事物中,最接近rx.Observable的可能是java.util.stream.Collector Stream + CompletableFuture组合(其代价是处理额外的monad层,即必须处理Stream<CompletableFuture<T>>和之间的转换CompletableFuture<Stream<T>>).
Observable和Stream之间存在显着差异:
Stream#parallel()将序列拆分为分区,Observable#subscribeOn()而Observable#observeOn()不是; Stream#parallel()使用Observable 模拟行为是很棘手的,它曾经有.parallel()方法,但是这个方法引起了很多混乱,.parallel()支持被移动到github上的独立存储库RxJavaParallel.更多细节在另一个答案中.Stream#parallel()不允许指定要使用的线程池,这与大多数接受可选Scheduler的RxJava方法不同.由于JVM中的所有流实例都使用相同的fork-join池,因此添加.parallel()可能会意外地影响程序的另一个模块中的行为Observable#interval(),Observable#window()以及许多其他操作; 这主要是因为Streams是基于拉力的takeWhile(),takeUntil()); 解决方法使用Stream#anyMatch()受到限制:它是终端操作,因此每个流不能多次使用它Observable#using()); 你可以用它包装IO流或互斥,并确保用户不会忘记释放资源 - 它将在订阅终止时自动处理; Stream有onClose(Runnable)方法,但您必须手动或通过try-with-resources调用它.E. g.你必须记住,Files#lines()必须包含在try-with-resources块中.综述:RxJava与Streams显着不同.Real RxJava替代品是ReactiveStreams的其他实现,例如Akka的相关部分.
更新.使用非默认的fork-join池有诀窍Stream#parallel,请参阅Java 8并行流中的自定义线程池
更新.以上所有内容均基于RxJava 1.x的经验.既然RxJava 2.x在这里,这个答案可能已经过时了.
dwu*_*sen 48
Java 8 Stream和RxJava看起来非常相似.它们具有相似的运算符(filter,map,flatMap ...),但不是为相同的用法而构建的.
您可以使用RxJava执行asynchonus任务.
使用Java 8流,您将遍历集合中的项目.
你可以在RxJava中做同样的事情(遍历集合的项目),但是,由于RxJava专注于并发任务,...,它使用同步,锁存,......所以使用RxJava的相同任务可能比用Java 8流.
可以比较RxJava CompletableFuture,但是它可以计算多于一个值.
aka*_*okd 35
存在一些技术和概念上的差异,例如,Java 8流是单用的,基于拉的,同步的值序列,而RxJava Observable是可重新观察的,自适应地基于推挽的,可能是异步的值序列.RxJava针对Java 6+,也适用于Android.
Bar*_*ter 29
Java 8 Streams是基于拉取的.您遍历消耗每个项目的Java 8流.它可能是一个无穷无尽的流.
RXJava Observable默认是基于推送的.您订阅了一个Observable,当下一个项目到达(onNext),或者流完成(onCompleted)或发生错误(onError)时,您将收到通知.因为Observable你收到onNext,onCompleted,onError事件,你可以做一些强大的功能,如不同的组合Observables到一个新的(zip,merge,concat).你可以做的其他事情是缓存,限制......它在不同语言中使用或多或少相同的API(RxJava,C#中的RX,RxJS,......)
默认情况下,RxJava是单线程的.除非您开始使用Scheduler,否则一切都将在同一个线程上发生.
Ada*_*hes 25
现有的答案是全面和正确的,但缺乏明确的初学者榜样.请允许我在"推/拉"和"可重新观察"等术语背后加上一些具体内容. 注意:我讨厌这个术语Observable(这是天堂般的流),所以只是简单地引用J8和RX流.
考虑一个整数列表,
digits = [1,2,3,4,5]
Run Code Online (Sandbox Code Playgroud)
J8 Stream是一个用于修改集合的实用程序.例如偶数可以提取为,
evens = digits.stream().filter(x -> x%2).collect(Collectors.toList())
Run Code Online (Sandbox Code Playgroud)
这基本上是Python的map,filter,reduce,一个非常好的(并且早就应该)加入Java.但是如果没有提前收集数字怎么办 - 如果数字在应用程序运行时流入,那么我们可以实时过滤偶数.
想象一下,一个单独的线程进程在app运行时随机输出整数(---表示时间)
digits = 12345---6------7--8--9-10--------11--12
Run Code Online (Sandbox Code Playgroud)
在RX中,even可以对每个新数字作出反应并实时应用滤波器
even = -2-4-----6---------8----10------------12
Run Code Online (Sandbox Code Playgroud)
无需存储输入和输出列表.如果你想要一个输出列表,也没有可流动的问题.事实上,一切都是流.
evens_stored = even.collect()
Run Code Online (Sandbox Code Playgroud)
这就是"无状态"和"功能"等术语与RX更相关的原因
| 归档时间: |
|
| 查看次数: |
39629 次 |
| 最近记录: |