Ale*_*sky 6 apache-kafka apache-spark spark-streaming spark-structured-streaming spark-streaming-kafka
我正在使用 Spark Structured Streaming 来使用来自 Kafka 的事件并将它们上传到 S3。
检查点在 S3 上提交:
DataFrameWriter<Row> writer = input.writeStream()
.format("orc")
.trigger(ProcessingTime(config.getProcessingTime()))
.outputMode(OutputMode.Append())
.option("truncate", false)
.option("checkpointLocation", "s3://bucket1")
.option("compression", "zlib")
.option("path", "s3://bucket2");
Run Code Online (Sandbox Code Playgroud)
偏移量通过StreamingQueryListener
以下方式提交给 Kafka :
kafkaConsumer.commitSync(topicPartitionMap);
Run Code Online (Sandbox Code Playgroud)
应用程序启动后,它会从 Kafka 检索偏移量映射并启动流:
reader = sparkSession
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", config.getKafkaBootStrapServers())
.option("subscribe", "topic1")
.option("max.poll.records", 1000)
.option("failOnDataLoss", false)
.option("startingOffsets", topicPartitionMap)
Run Code Online (Sandbox Code Playgroud)
我将topic/partition/offset
数据存储在 ORC 文件中。
数据包含具有精确 的事件的多个重复项topic/partition/offset
。
应如何配置流以实现恰好一次处理?
发现这些参数应该设置为true
spark.streaming.driver.writeAheadLog.closeFileAfterWrite
和spark.streaming.receiver.writeAheadLog.closeFileAfterWrite
当您想将 S3 用于元数据 WAL 时,将此设置为“true”
https://spark.apache.org/docs/latest/configuration.html
更多详情:https : //www.waitingforcode.com/apache-spark-streaming/spark-streaming-configuration/read?fbclid=IwAR17x1AfTLH1pjq1QPkDsQT6DU4hgi7WNeIYUnw25Hvquoj-4yQU10R0GeM
归档时间: |
|
查看次数: |
1199 次 |
最近记录: |