如何获得结构化查询的Kafka偏移量以进行手动和可靠的偏移管理?

dna*_*nko 22 offset apache-kafka apache-spark apache-spark-sql spark-structured-streaming

Spark 2.2引入了Kafka的结构化流媒体源.据我所知,它依靠HDFS检查点目录来存储偏移并保证"完全一次"的消息传递.

但旧的码头(如https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/)表示Spark Streaming检查点无法跨应用程序恢复或Spark升级,因此不太可靠.作为一种解决方案,有一种做法是支持在支持MySQL或RedshiftDB等事务的外部存储中存储偏移量.

如果我想将Kafka源的偏移存储到事务DB,我如何从结构化流批处理中获得偏移量?

以前,可以通过将RDD转换为HasOffsetRanges:

val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges    
Run Code Online (Sandbox Code Playgroud)

但是使用新的Streaming API,我有一个Dataset,InternalRow我找不到一个简单的方法来获取偏移量.Sink API只有addBatch(batchId: Long, data: DataFrame)方法,我怎么能想得到给定批次ID的偏移量?

Jac*_*ski 40

Spark 2.2引入了Kafka的结构化流媒体源.据我了解,它依靠HDFS checkpoint dir来存储偏移量并保证"完全一次"的消息传递.

正确.

每个触发器Spark Structured Streaming都会将偏移量保存到offset检查点位置(使用checkpointLocation选项或spark.sql.streaming.checkpointLocationSpark属性定义或随机分配)中的目录,以保证最多只处理一次偏移.该功能称为" 写入前进日志".

检查点位置中的另一个目录是commits已完成的流批处理的目录,每个批处理具有一个文件(文件名为批处理ID).

引用Fault Tolerance Semantics中的官方文档:

为实现这一目标,我们设计了结构化流媒体源,接收器和执行引擎,以可靠地跟踪处理的确切进度,以便通过重新启动和/或重新处理来处理任何类型的故障.假设每个流源都具有偏移(类似于Kafka偏移或Kinesis序列号)以跟踪流中的读取位置.引擎使用检查点和预写日志来记录每个触发器中正在处理的数据的偏移范围.流式接收器设计为处理重新处理的幂等功能.结合使用可重放的源和幂等接收器,结构化流可以确保在任何失败的情况下端到端完全一次的语义.

每次执行触发器时都会StreamExecution检查目录并"计算"已经处理的偏移量.这给你至少一次语义,总共一次.

但旧文档(...)表示Spark Streaming检查点无法跨应用程序或Spark升级恢复,因此不太可靠.

有一个原因,你称他们"老",不是吗?

它们引用了旧的和(在我看来)死亡的Spark Streaming,它不仅保留了偏移量,而且还保留了导致检查点几乎无法使用的情况的整个查询代码,例如当您更改代码时.

时间已经过去了,结构化流媒体更加谨慎,什么时候检查点.

如果我想将Kafka源的偏移存储到事务DB,我如何从结构化流批处理中获得偏移量?

解决方案可以是实现或以某种方式使用MetadataLog接口来处理偏移检查点.那可能有用.

我怎么能想得到给定批次ID的偏移?

目前还不可能.

我的理解是你将无法做到这一点,因为流式语义对你来说是隐藏的.你根本应该处理这种称为偏移的低级"事物",Spark Structured Streaming使用它来提供一次保证.

引用Michael Armbrust在Spark Spark中使用结构化流媒体轻松,可扩展,容错流处理的演讲中引用Michael Armbrust :

你不应该对流媒体有所了解

在谈话(下一张幻灯片)进一步:

你应该编写简单的查询,Spark应该不断更新答案


一种方式来获得补偿(从任何来源,包括卡夫卡)使用StreamingQueryProgress,你可以使用拦截StreamingQueryListeneronQueryProgress回调.

onQueryProgress(event:QueryProgressEvent):单元在有一些状态更新时被调用(更新摄取率等)

随着StreamingQueryProgress您可以访问sources财产SourceProgress,给你想要的东西.

  • 哇,很好的答案:)但最后一点应该是第一个:)但是,投票是值得的:) (2认同)
  • “你不应该。期间。” - 这不是我要找的答案 :) 如果您遵循 Spark 上的 JIRA 票证,获取偏移量仍然是一个有效的用例。例如,不将自己锁定在 Spark 上会怎样?如果我在外部存储中有偏移量,我可以将我的 ETL 重写到 Apache Flink,然后让它从我的存储中获取最新的偏移量(默认情况下这是可靠的,因为所有数据/偏移量更新都发生在一个事务中) (2认同)
  • @JacekLaskowski我不这么认为.它对正在发生的事情有一个很好的解释,它有助于我获得更好的理解,但部分误导说"你不应该这样做".也许最好说,目前很难做到这一点 - 您要么传递主题/分区并从Source偏移到Sink,要么从checkpoint目录中读取批量ID的偏移量. (2认同)

dna*_*nko 5

相关的 Spark DEV 邮件列表讨论主题在这里

从中总结:

Spark Streaming 将支持在未来版本 (> 2.2.0) 中获取偏移量。要遵循的 JIRA 票证 - https://issues-test.apache.org/jira/browse/SPARK-18258

对于 Spark <= 2.2.0,您可以通过从 checkpoint 目录读取 json 来获取给定批次的偏移量(API 不稳定,所以要小心):

val checkpointRoot = // read 'checkpointLocation' from custom sink params
val checkpointDir = new Path(new Path(checkpointRoot), "offsets").toUri.toString
val offsetSeqLog = new OffsetSeqLog(sparkSession, checkpointDir)

val endOffset: Map[TopicPartition, Long] = offsetSeqLog.get(batchId).map { endOffset =>
  endOffset.offsets.filter(_.isDefined).map { str =>
    JsonUtilsWrapper.jsonToOffsets(str.get.json)
  }
}


/**
  * Hack to access private API
  * Put this class into org.apache.spark.sql.kafka010 package
  */
object JsonUtilsWrapper {
  def offsetsToJson(partitionOffsets: Map[TopicPartition, Long]): String = {
    JsonUtils.partitionOffsets(partitionOffsets)
  }

  def jsonToOffsets(str: String): Map[TopicPartition, Long] = {
    JsonUtils.partitionOffsets(str)
  }
}
Run Code Online (Sandbox Code Playgroud)

endOffset将包含每个主题/分区的直到偏移量。获取起始偏移量是有问题的,因为您必须阅读“提交”检查点目录。但通常情况下,您并不关心起始偏移量,因为存储结束偏移量足以可靠地重新启动 Spark 作业。

请注意,您还必须将处理过的批次 ID 存储在您的存储中。在某些情况下,Spark 可以使用相同的批次 ID 重新运行失败的批次,因此请确保使用最新处理的批次 ID(您应该从外部存储读取)初始化自定义接收器,并忽略任何 id <latestProcessedBatchId 的批次。顺便说一句,批次 ID 在查询中不是唯一的,因此您必须分别为每个查询存储批次 ID。