Spark 2.2引入了Kafka的结构化流媒体源.据我所知,它依靠HDFS检查点目录来存储偏移并保证"完全一次"的消息传递.
但旧的码头(如https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/)表示Spark Streaming检查点无法跨应用程序恢复或Spark升级,因此不太可靠.作为一种解决方案,有一种做法是支持在支持MySQL或RedshiftDB等事务的外部存储中存储偏移量.
如果我想将Kafka源的偏移存储到事务DB,我如何从结构化流批处理中获得偏移量?
以前,可以通过将RDD转换为HasOffsetRanges:
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
Run Code Online (Sandbox Code Playgroud)
但是使用新的Streaming API,我有一个Dataset,InternalRow我找不到一个简单的方法来获取偏移量.Sink API只有addBatch(batchId: Long, data: DataFrame)方法,我怎么能想得到给定批次ID的偏移量?
offset apache-kafka apache-spark apache-spark-sql spark-structured-streaming
我正在考虑将 kafka 偏移量存储在 Spark Structured Streaming 的 kafka 内部,就像它适用于 DStreams 一样stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges),与我正在寻找的相同,但适用于 Structured Streaming。它支持结构化流吗?如果是,我怎样才能实现它?
我知道使用 hdfs 检查点.option("checkpointLocation", checkpointLocation),但我对内置偏移管理非常感兴趣。
我期望 kafka 仅在没有 Spark hdfs 检查点的情况下存储偏移量。
apache-kafka apache-spark spark-structured-streaming spark-kafka-integration
我正在为我的 Spark 结构化流应用程序构建监控,并且需要获取 Spark 应用程序所使用的某个主题的消费者滞后情况。我相信 Spark 驱动程序必须意识到这种滞后,因为它拥有执行程序的所有元数据。我看不到任何方法可以从任何现有的 Spark 文档或资源中获取此指标。我检查了streaminQueryListener接口,但它的功能也有限,因为我们只能从中获取每个查询指标。
基于 Spark 3.0 中的介绍,https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html。应该可以设置“kafka.group.id”来跟踪偏移量。对于我们的用例,如果流式 Spark 作业失败并重新启动,我想避免潜在的数据丢失。根据我之前的问题,我觉得 Spark 3.0 中的 kafka.group.id 会有所帮助。
如何为 Spark 结构化流指定 kafka 消费者的组 ID?
如何通过 Spark Structured Streaming 确保 kafka 数据摄取不会丢失数据?
但是,我在 spark 3.0 中尝试了如下设置。
package com.example
/**
* @author ${user.name}
*/
import scala.math.random
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, BooleanType, LongType}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SaveMode
import org.apache.spark.SparkFiles
import java.util.Properties
import org.postgresql.Driver
import org.apache.spark.sql.streaming.Trigger
import java.time.Instant
import org.apache.hadoop.fs.{FileSystem, Path}
import java.net.URI
import java.sql.Connection
import java.sql.DriverManager
import java.sql.ResultSet
import …Run Code Online (Sandbox Code Playgroud) scala apache-kafka apache-spark spark-structured-streaming spark-kafka-integration