Spark 结构化流恰好一次 - 未实现 - 重复事件

Ale*_*sky 6 apache-kafka apache-spark spark-streaming spark-structured-streaming spark-streaming-kafka

我正在使用 Spark Structured Streaming 来使用来自 Kafka 的事件并将它们上传到 S3。

检查点在 S3 上提交:

DataFrameWriter<Row> writer = input.writeStream()
           .format("orc")
           .trigger(ProcessingTime(config.getProcessingTime()))
           .outputMode(OutputMode.Append())
           .option("truncate", false)           
           .option("checkpointLocation", "s3://bucket1")
           .option("compression", "zlib")
           .option("path", "s3://bucket2");
Run Code Online (Sandbox Code Playgroud)

偏移量通过StreamingQueryListener以下方式提交给 Kafka :

  kafkaConsumer.commitSync(topicPartitionMap);
Run Code Online (Sandbox Code Playgroud)

应用程序启动后,它会从 Kafka 检索偏移量映射并启动流:

 reader = sparkSession
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", config.getKafkaBootStrapServers())
            .option("subscribe", "topic1")
            .option("max.poll.records", 1000)
            .option("failOnDataLoss", false)
            .option("startingOffsets", topicPartitionMap)
Run Code Online (Sandbox Code Playgroud)

我将topic/partition/offset数据存储在 ORC 文件中。

数据包含具有精确 的事件的多个重复项topic/partition/offset

应如何配置流以实现恰好一次处理?

Ale*_*sky 5

发现这些参数应该设置为true spark.streaming.driver.writeAheadLog.closeFileAfterWritespark.streaming.receiver.writeAheadLog.closeFileAfterWrite

当您想将 S3 用于元数据 WAL 时,将此设置为“true”

https://spark.apache.org/docs/latest/configuration.html

更多详情:https : //www.waitingforcode.com/apache-spark-streaming/spark-streaming-configuration/read?fbclid=IwAR17x1AfTLH1pjq1QPkDsQT6DU4hgi7WNeIYUnw25Hvquoj-4yQU10R0GeM