Flink作业失败,原因为:java.io.IOException:rpc调用大小超出最大akka帧大小

cut*_*995 2 apache-flink

Flink作业失败,错误信息如下

2020-12-02 09:37:27
java.util.concurrent.CompletionException: java.lang.reflect.UndeclaredThrowableException
    at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
    at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.reflect.UndeclaredThrowableException
    at com.sun.proxy.$Proxy41.submitTask(Unknown Source)
    at org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.submitTask(RpcTaskManagerGateway.java:77)
    at org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$9(Execution.java:735)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
    ... 7 more
Caused by: java.io.IOException: The rpc invocation size exceeds the maximum akka framesize.
    at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:270)
    at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:200)
    at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:129)
    ... 11 more

Run Code Online (Sandbox Code Playgroud)

这个作业的逻辑很简单,Kafka的消费数据保存到Clickhouse中。

启动命令

2020-12-02 09:37:27
java.util.concurrent.CompletionException: java.lang.reflect.UndeclaredThrowableException
    at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
    at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.reflect.UndeclaredThrowableException
    at com.sun.proxy.$Proxy41.submitTask(Unknown Source)
    at org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.submitTask(RpcTaskManagerGateway.java:77)
    at org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$9(Execution.java:735)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
    ... 7 more
Caused by: java.io.IOException: The rpc invocation size exceeds the maximum akka framesize.
    at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:270)
    at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:200)
    at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:129)
    ... 11 more

Run Code Online (Sandbox Code Playgroud)

这是为什么?谢谢

Bri*_*low 5

该异常意味着消息的有效负载(JM向TM提交任务)超过了最大大小。akka.framesize尝试通过添加到来增加最大大小flink-conf.yaml

默认值为:10485760b。尝试为此设置一个更大的数字。可能需要重启JM/TM或Flink集群。

文档: https: //ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#akka-framesize

  • @RyanShao Flink 使用 AKKA 进行不同服务(Master/Executor/等)之间的内部通信。AFAIK,Flink 中的 AKKA 使用堆外内存,因此要注意任务执行器的堆外使用情况。 (2认同)