我可以在 RichAsyncFunction 中编写同步代码吗

Xpe*_*ria 3 apache-flink flink-streaming

当我需要使用 I/O(查询 DB、调用第三个 API,...)时,我可以使用 RichAsyncFunction。但我需要通过 GG Sheet API 与 Google Sheet 进行交互: https: //developers.google.com/sheets/api/quickstart/java。这个API是同步的。我写了下面的代码片段:

public class SendGGSheetFunction extends RichAsyncFunction<Obj, String> {

    @Override
    public void asyncInvoke(Obj message, final ResultFuture<String> resultFuture) {
        CompletableFuture.supplyAsync(() -> {
            syncSendToGGSheet(message);
            return "";
        }).thenAccept((String result) -> {
            resultFuture.complete(Collections.singleton(result));
        });
    }

}
Run Code Online (Sandbox Code Playgroud)

但我发现消息发送到GGSheet非常慢,看起来是同步发送的。

Arv*_*ise 6

用户执行的大部分代码AsyncIO最初都是同步的。您只需要确保它实际上是在单独的线程中执行的。ExecutorService最常见的是使用(静态共享) 。

private class SendGGSheetFunction extends RichAsyncFunction<Obj, String> {
   private transient ExecutorService executorService;

   @Override
   public void open(Configuration parameters) throws Exception {
      super.open(parameters);
      executorService = Executors.newFixedThreadPool(30);
   }

   @Override
   public void close() throws Exception {
      super.close();
      executorService.shutdownNow();
   }

   @Override
   public void asyncInvoke(final Obj message, final ResultFuture<String> resultFuture) {
      executorService.submit(() -> {
         try {
            resultFuture.complete(syncSendToGGSheet(message));
         } catch (SQLException e) {
            resultFuture.completeExceptionally(e);
         }
      });
   }
}
Run Code Online (Sandbox Code Playgroud)

以下是有关如何调整 AsyncIO 以提高吞吐量的一些注意事项:http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Async-IO-operator-tuning-micro-benchmarks -td35858.html