CompletableFuture,Future和RxJava的Observable之间的区别

shi*_*455 173 java multithreading asynchronous java-8 rx-java

我想知道的区别 CompletableFuture,FutureObservable RxJava.

我所知道的都是异步但是

Future.get() 阻止线程

CompletableFuture 给出了回调方法

RxJava Observable--- CompletableFuture与其他好处相似(不确定)

例如:如果客户端需要进行多次服务调用,那么当我们使用Futures(Java)时Future.get()将按顺序执行...想知道它在RxJava中的表现如何...

文档http://reactivex.io/intro.html

很难使用Futures来优化组合条件异步执行流程(或者不可能,因为每个请求的延迟在运行时会有所不同).当然,这可以完成,但它很快变得复杂(因此容易出错)或者过早地阻塞Future.get(),这消除了异步执行的好处.

真的很想知道如何RxJava解决这个问题.我发现从文档中很难理解.

Mal*_*alt 250

期货

Java 5(2004)中介绍了期货.他们基本上是一个尚未发生的结果的占位符.他们承诺一旦操作完成,就会保留一些操作的结果.该操作可以是提交给ExecutorServiceRunnableCallable实例.操作的提交者可以使用该对象来检查操作是否为Done(),或者等待它使用get()完成.Future

例:

/**
* A task that sleeps for a second, then returns 1
**/
public static class MyCallable implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
        Thread.sleep(1000);
        return 1;
    }

}

public static void main(String[] args) throws Exception{
    ExecutorService exec = Executors.newSingleThreadExecutor();
    Future<Integer> f = exec.submit(new MyCallable());

    System.out.println(f.isDone()); //False

    System.out.println(f.get()); //Waits until the task is done, then prints 1
}
Run Code Online (Sandbox Code Playgroud)

CompletableFutures

CompletableFutures在Java 8(2014)中引入.它们实际上是常规期货的演变,受到Google的可听期货的启发,这是Guava图书馆的一部分.它们是Futures,它还允许您将任务串联在一起.您可以使用它们来告诉某些工作线程"去做一些任务X,当你完成后,使用X的结果去做其他事情".这是一个简单的例子:

/**
* A supplier that sleeps for a second, and then returns one
**/
public static class MySupplier implements Supplier<Integer> {

    @Override
    public Integer get() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            //Do nothing
        }
        return 1;
    }
}

/**
* A (pure) function that adds one to a given Integer
**/
public static class PlusOne implements Function<Integer, Integer> {

    @Override
    public Integer apply(Integer x) {
        return x + 1;
    }
}

public static void main(String[] args) throws Exception {
    ExecutorService exec = Executors.newSingleThreadExecutor();
    CompletableFuture<Integer> f = CompletableFuture.supplyAsync(new MySupplier(), exec);
    System.out.println(f.isDone()); // False
    CompletableFuture<Integer> f2 = f.thenApply(new PlusOne());
    System.out.println(f2.get()); // Waits until the "calculation" is done, then prints 2
}
Run Code Online (Sandbox Code Playgroud)

RxJava

RxJava是Netflix创建的反应式编程的完整库.一目了然,它看起来与Java 8的流类似.它是,除了它更强大.

与Futures类似,RxJava可用于将一堆同步或异步操作串联在一起以创建处理管道.与一次性使用的Futures不同,RxJava适用于零个或多个项目的.包括具有无限数量项目的永无止境的流.由于拥有令人难以置信的丰富运营商,它也更加灵活和强大.

与Java 8的流不同,RxJava还具有背压机制,允许它处理处理管道的不同部分以不同速率在不同线程中运行的情况.

RxJava的缺点是,尽管有可靠的文档,但由于涉及范式的转变,它是一个具有挑战性的库.Rx代码也可能是调试的噩梦,特别是如果涉及多个线程,甚至更糟 - 如果需要背压.

如果你想进入它,官方网站上有一整各种教程,加上官方文档Javadoc.您还可以看看一些影片,如这一个这给出了一个简短的介绍到Rx和也谈到了Rx和期货之间的差异.

奖励:Java 9 Reactive Streams

Java 9的Reactive Streams aka Flow API是由各种反应流库(如RxJava 2,Akka StreamsVertx)实现的一组接口.它们允许这些反应库互连,同时保留所有重要的背压.

  • 最好能给出 Rx 如何执行此操作的示例代码 (2认同)
  • @费德里科当然。每个“Future”都是一个“单个”结果的占位符,该结果可能已完成,也可能尚未完成。如果再次执行相同的操作,您将获得一个新的“Future”实例。RxJava 处理随时可能出现的结果“流”。因此,一系列操作可以返回单个 RxJava 可观察值,该可观察值将产生一堆结果。这有点像单个邮政信封和不断抽出邮件的气动管之间的区别。 (2认同)

小智 19

我从0.9开始使用Rx Java,现在是1.3.2,很快就转移到2.x我在一个已经工作了8年的私人项目中使用它.

我根本不会在没有这个库的情况下编程.一开始我很怀疑,但这是你需要创造的另一种完整的心态.一开始很安静.我有时候看着弹珠几个小时..哈哈

这只是一个实践问题而且真正了解流程(也就是观察者和观察者的合同),一旦你到达那里,你就会讨厌这样做.

对我来说,这个图书馆并没有真正的缺点.

使用案例:我有一个监视器视图,包含9个仪表(cpu,mem,network等...).启动视图时,视图会将自身订阅到系统监视器类,该类返回包含9米所有数据的可观察(间隔).它将每秒推送一个新结果到视图(所以不要轮询!!!).该observable使用flatmap同时(异步!)从9个不同的源获取数据,并将结果压缩到您的视图将在onNext()上获得的新模型.

你将如何用期货,补充等来做到这一点......祝你好运!:)

Rx Java为我解决了编程中的许多问题,并且在某种程度上简化了...

好处:

  • Statelss !!! (重要的是要提,最重要的可能)
  • 线程管理开箱即用
  • 构建具有自己生命周期的序列
  • 一切都是可观察的,所以链接很容易
  • 少写代码
  • 类路径上的单个jar(非常轻量级)
  • 高度并发
  • 再也没有回调了
  • 基于订户(消费者和生产者之间的紧密合同)
  • 背压策略(断路器等)
  • 出色的错误处理和恢复
  • 非常好的文档(大理石<3)
  • 完全控制
  • 还有很多 ...

缺点: - 难以测试

  • 〜"**我不会在没有这个库的情况下编程.**"所以RxJava是所有软件项目的最终目标吗? (11认同)
  • 即使我没有异步事件流,它是否有用? (2认同)

小智 7

JavaFuture是一个占位符,用于保存将来将通过阻塞 API 完成的事情。您必须使用它的isDone()方法定期轮询它以检查该任务是否完成。当然,您可以实现自己的异步代码来管理轮询逻辑。然而,它会产生更多的样板代码和调试开销。

Java 的CompletableFuture创新是 Scala 的未来。它带有一个内部回调方法。一旦完成,回调方法将被触发并告诉线程应该执行下游操作。这就是为什么它有thenApply方法对包装在CompletableFuture.

RxJavaObservableCompletableFuture. 它允许您处理背压。在我们上面提到的thenApply方法(甚至与其兄弟thenApplyAsync)中,可能会发生这种情况:下游方法想要调用有时可能不可用的外部服务。在这种情况下,CompleteableFuture将完全失败,您必须自行处理错误。但是,Observable一旦外部服务可用,您就可以处理背压并继续执行。

另外,还有一个类似的界面ObservableFlowable。它们是为不同的目的而设计的。通常Flowable专门用于处理冷的、不定时的操作,而Observable专门用于处理需要即时响应的执行。请参阅此处的官方文档:https://github.com/ReactiveX/RxJava#backPressure


Ale*_*dov 5

所有三个接口都用于将值从生产者转移到消费者。消费者可以分为两种:

  • 同步:消费者进行阻塞调用,当值准备好时返回
  • 异步:当值准备好时,调用消费者的回调方法

此外,通信接口在其他方面也有所不同:

  • 能够传输多个值的单个值
  • 如果多个值,可支持或不支持背压

因此:

  • Future使用同步接口传输单个值

  • CompletableFuture使用同步和异步接口传输单个值

  • Rx使用具有背压的异步接口传输多个值

此外,所有这些通信设施都支持传输异常。这并非总是如此。例如,BlockingQueue没有。