相关疑难解决方法(0)

如何获得结构化查询的Kafka偏移量以进行手动和可靠的偏移管理?

Spark 2.2引入了Kafka的结构化流媒体源.据我所知,它依靠HDFS检查点目录来存储偏移并保证"完全一次"的消息传递.

但旧的码头(如https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/)表示Spark Streaming检查点无法跨应用程序恢复或Spark升级,因此不太可靠.作为一种解决方案,有一种做法是支持在支持MySQL或RedshiftDB等事务的外部存储中存储偏移量.

如果我想将Kafka源的偏移存储到事务DB,我如何从结构化流批处理中获得偏移量?

以前,可以通过将RDD转换为HasOffsetRanges:

val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges    
Run Code Online (Sandbox Code Playgroud)

但是使用新的Streaming API,我有一个Dataset,InternalRow我找不到一个简单的方法来获取偏移量.Sink API只有addBatch(batchId: Long, data: DataFrame)方法,我怎么能想得到给定批次ID的偏移量?

offset apache-kafka apache-spark apache-spark-sql spark-structured-streaming

22
推荐指数
2
解决办法
6305
查看次数