所以,我试图在我的 Flink Kafka 流作业中启用 EXACTLY_ONCE 语义以及检查点。
但是我没有让它工作,所以我尝试从 Github 下载测试示例代码:https : //github.com/apache/flink/blob/c025407e8a11dff344b587324ed73bdba2024dff/flink-end-to-end-tests/flink-streaming- kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java
所以运行这个工作正常。但是,当启用检查点时,我收到错误。或者,如果我将 EXACTLY_ONCE 更改为 AT_LEAST_ONCE 语义并启用检查点,则它可以正常工作。但是当将其更改为 EXACTLY_ONCE 时,我再次收到此错误。
我得到的例外:
org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:593)
at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)
at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:650)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.abortTransactions(FlinkKafkaProducer.java:1099)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1036)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at …
Run Code Online (Sandbox Code Playgroud) Apache Flink 通过从检查点恢复作业来保证故障时的一次性处理和恢复,检查点是分布式数据流和算子状态的一致快照(分布式快照的 Chandy-Lamport 算法)。 这保证了故障转移时恰好一次。
在集群正常运行的情况下,Flink 如何保证一次处理,例如给定一个从外部源(比如 Kafka)读取的 Flink 源,Flink 如何保证事件从源读取一次?事件源和 Flink 源之间是否存在任何类型的应用程序级别确认?另外,Flink 如何保证事件从上游算子到下游算子只传播一次?这是否也需要对收到的事件进行任何类型的确认?