用于顺序代码的 Java CompletableFuture

Hen*_*rre 8 java asynchronous nonblocking grpc completable-future

我的新团队正在编写一个 Java gRPC 服务,为了确保我们永远不会阻塞请求线程,我们最终将或多或少的所有方法包装在 CompletableFuture 中,即使这些端点在概念上是操作的顺序列表(无并行性)。

\n

所以代码看起来像这样(如果需要,最后可以提供 Java 示例):

\n
  methodA()\n    methodB()\n      methodD() (let say this one is a 15ms RPC call)\n      methodE()\n    methodC()\n      methodF() (let say this one is a 5ms CPU intensive work)\n      methodG()\n \n
Run Code Online (Sandbox Code Playgroud)\n

语境:

\n
    \n
  • 在实践中,应用程序要大得多,并且有更多的功能层
  • \n
  • 每个应用程序主机需要处理 1000 QPS,因此您可以想象以该速率调用 methodA
  • \n
  • 某些函数(少数)进行 RPC 调用,可能需要 5-30 毫秒 (IO)
  • \n
  • 某些功能(很少)运行 CPU 密集型工作(< 5ms)
  • \n
\n

编辑1:昨天在网上进行了更多阅读后,我明白,当且仅当我们使用真正的非阻塞HTTP和DB客户端(并且看起来JDBC不是非阻塞的)时,这种模式可以减少所需的线程总数。我的理解是,如果我们有足够的内存来为每个请求保留一个线程,那么使用同步代码仍然可能是最有效的实现(减少切换线程和加载数据的开销),但是如果我们没有足够的内存为了保持那么多线程处于活动状态,那么使整个代码成为非阻塞的概念可以减少线程数量,从而允许应用程序扩展到更多请求。

\n

问题一: \n我知道这会解锁“请求线程”,但实际上有什么好处?我们真的节省了 CPU 时间吗?在下面的示例中,感觉“某些”线程无论如何都会一直处于活动状态(在下面的示例中,主要是来自 methodD 中 CompletableFuture.supplyAsync 的线程),只是碰巧它\xe2\x80\x99s 不一样线程作为接收初始请求的线程。

\n

问题 2: \n该模式确实是“最佳实践”并且所有服务都应遵循类似的模式吗?我觉得除了让代码有点难以阅读之外,每个请求都会调用 50 多个方法,并且我们会调用 CompletableFuture.thenCompose().supplyAsync. 看起来这会增加一些开销。是否被CompletableFuture设计为在每个方法的整个代码库中都以这种方式使用?

\n

附件(java示例):

\n
  public void myEndpoint(MyRequest request, StreamObserver<MyResponse> responseObserver) {\n    methodA(10)\n        .thenApply((response) -> responseObserver.next(response));\n    \n  }\n\n  public CompletableFuture<Integer> methodA(Integer input) {\n    return CompletableFuture.completedFuture(input)\n        .thenCompose(this::methodB)\n        .thenCompose(this::methodC)\n        .thenApply((i) -> {\n          System.out.println("MethodA executed by ".concat(Thread.currentThread().getName() + ": " + i));\n          return i;\n        });\n  }\n\n  public CompletableFuture<Integer> methodB(Integer input) {\n    return CompletableFuture.completedFuture(input)\n        .thenCompose(this::methodD)\n        .thenCompose(this::methodE)\n        .thenApply((i) -> {\n          System.out.println("MethodB executed by ".concat(Thread.currentThread().getName() + ": " + i));\n          return i;\n        });\n  }\n\n  public CompletableFuture<Integer> methodC(Integer input) {\n    return CompletableFuture.completedFuture(input)\n        .thenCompose(this::methodF)\n        .thenCompose(this::methodG)\n        .thenApply((i) -> {\n          System.out.println("MethodC executed by ".concat(Thread.currentThread().getName() + ": " + i));\n          return i;\n        });\n  }\n\n  public CompletableFuture<Integer> methodD(Integer input) {\n    return CompletableFuture.supplyAsync(() -> {\n      try {\n        // Assume it\'s a RPC call that takes 5-30ms\n        Thread.sleep(20);\n        System.out.println("MethodD executed by ".concat(Thread.currentThread().getName() + ": " + input));\n      } catch (InterruptedException e) {\n        throw new RuntimeException(e);\n      }\n      return input + 1;\n    });\n  }\n\n  public CompletableFuture<Integer> methodE(Integer input) {\n    return CompletableFuture.supplyAsync(() -> {\n      System.out.println("MethodE executed by ".concat(Thread.currentThread().getName() + ": " + input));\n      return input + 1;\n    });\n  }\n\n  public CompletableFuture<Integer> methodF(Integer input) {\n    return CompletableFuture.supplyAsync(() -> {\n      try {\n        // Let\'s assume it\'s a CPU intensive work that takes 2-5ms\n        Thread.sleep(5);\n        System.out.println("MethodF executed by ".concat(Thread.currentThread().getName() + ": " + input));\n      } catch (InterruptedException e) {\n        throw new RuntimeException(e);\n      }\n      return input + 1;\n    });\n  }\n\n  public CompletableFuture<Integer> methodG(Integer input) {\n    return CompletableFuture.supplyAsync(() -> {\n      System.out.println("MethodG executed by ".concat(Thread.currentThread().getName() + ": " + input));\n      return input + 1;\n    });\n  }\n
Run Code Online (Sandbox Code Playgroud)\n

Hol*_*ger 3

前提是线程是稀缺资源,这不是线程固有的,而是使用具有配置的最大值的线程池的结果。今天\xe2\x80\x99s 框架使用池的原因是,目前实现的线程非常昂贵,并且创建太多线程可能会导致严重的性能问题。

\n

你写了

\n
\n

我的理解是,如果我们有足够的内存来为每个请求保留一个线程,那么使用同步代码仍然可能是最有效的实现\xe2\x80\xa6

\n
\n

这正在朝着正确的方向发展,但重要的是要记住,可能存在比内存更多的限制。某些操作系统\xe2\x80\x99s 调度程序在处理大量线程时效率会显着降低,有些甚至可能对允许进程创建的线程数有固定限制。

\n

因此,当您通过等待另一个线程来阻塞一个线程时,您就限制了线程池的并行处理能力。如果您使用的是 \xe2\x80\x9ctrue 非阻塞 \xe2\x80\x9d API,或者只是任何返回 future 的现有 API,则这适用。正如您正确指出的那样,通过提交您自己的操作supplyAsync没有意义,因为供应商\xe2\x80\x99s 代码仍然由线程执行。

\n

但是,当您有一个由操作返回的现有 future 时,您应该宁愿链接依赖的处理步骤,而不是通过join和 朋友等待其结果。请注意,调用join()现有的 future 可能会让事情变得比仅仅阻塞线程更糟糕:

\n

当您调用join()a时CompletableFuture,它会尝试弥补该问题。当调用者是 Fork/Join 池的工作线程时,可能会发生以下两种情况之一:

\n
    \n
  • 它可能会尝试获取挂起的作业并就地执行它们,而不是不执行任何操作,类似于awaitQuiescence。\n
      \n
    • 在最好的情况下,它将直接获取您刚刚安排的作业supplyAsync(如果使用相同的池)并执行它,几乎就像您没有执行它一样CompletableFuture(只是消耗更多的堆栈空间)。
    • \n
    • 在最坏的情况下,线程将忙于执行一个长时间运行的、完全不相关的作业,而它\xe2\x80\x99s实际等待的作业早已完成。想象一下如果那个不相关的工作也调用 会发生什么join
    • \n
    \n
  • \n
  • 它最终可能会实际阻塞线程,但使用ForkJoinPool.managedBlock(\xe2\x80\xa6),这可能会启动一个新的工作线程以确保保留 pool\xe2\x80\x99s 配置的并行性。很好地解决了并行性降低的问题,但另一方面,又重新引入了您实际上想通过线程池解决的资源消耗问题。
  • \n
\n

最糟糕的是你甚至无法预测这两种情况中哪一种会发生。

\n
\n

然而,在某些情况下,不利用其他线程来阻塞请求线程是有道理的。最值得注意的是,当请求本身的响应时间很重要并且后台计算的结果独立于初始响应而传递时。这种模式最突出的例子是 GUI 框架的事件调度线程,它必须避免长时间运行的操作,以便能够处理后续的用户输入。

\n
\n

请注意,有一个通用的解决方案正在出现,可以使未来所有链中 99% 都过时。虚拟线程在 JDK\xc2\xa019 中处于预览状态,创建起来很便宜,并且允许为每个请求创建一个线程,就像您在上面引用中设想的那样。当一个虚拟线程被阻塞时,它将为下一个虚拟线程释放底层平台线程,因此没有理由犹豫调用join()任何 future,即使是那些属于 \xe2\x80\x9ctrue non-blocking\xe2\x80 \x9d API。

\n

与这个概念和现状进行互操作的最佳方法是设计返回 future 的方法,而是就地执行操作。\xe2\x80\x99s 仍然可以在必要时设计未来的链,即通过使用.thenApplyAsync(this::inPlaceEvalMethod)而不是.thenCompose(this::futureReturningMethod). 但同时,您可以编写一个简单的顺序版本,只需调用这些方法,即可由虚拟线程执行。事实上,您现在甚至可以添加简单的顺序版本并对两种方法进行基准测试。结果可能会让您的团队成员相信 \xe2\x80\x9c 不阻塞请求线程\xe2\x80\x9d 并不一定是一种改进。

\n