SparkStreaming:避免检查点位置检查

ale*_*z00 5 java scala apache-spark spark-streaming spark-structured-streaming

我正在编写一个库来将 Apache Spark 与自定义环境集成。我正在实现自定义流媒体源和流媒体作家。

我正在开发的一些源是不可恢复的,至少在应用程序崩溃之后是这样。如果应用程序重新启动,则需要重新加载所有数据。因此,我们希望避免用户必须明确设置 'checkpointLocation' 选项。但是如果没有提供该选项,我们会看到以下错误:

org.apache.spark.sql.AnalysisException: checkpointLocation must be specified either through option("checkpointLocation", ...) or SparkSession.conf.set("spark.sql.streaming.checkpointLocation", ...);
Run Code Online (Sandbox Code Playgroud)

但是,如果我使用控制台流输出,一切正常。

有没有办法获得相同的行为?

注意:我们为流读取器/写入器使用 Spark v2 接口。


火花日志:

18/06/29 16:36:48 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/C:/mydir/spark-warehouse/').
18/06/29 16:36:48 INFO SharedState: Warehouse path is 'file:/C:/mydir/spark-warehouse/'.
18/06/29 16:36:48 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
org.apache.spark.sql.AnalysisException: checkpointLocation must be specified either through option("checkpointLocation", ...) or SparkSession.conf.set("spark.sql.streaming.checkpointLocation", ...);
    at org.apache.spark.sql.streaming.StreamingQueryManager$$anonfun$3.apply(StreamingQueryManager.scala:213)
    at org.apache.spark.sql.streaming.StreamingQueryManager$$anonfun$3.apply(StreamingQueryManager.scala:208)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:207)
    at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:299)
    at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:296)
    ...
18/06/29 16:36:50 INFO SparkContext: Invoking stop() from shutdown hook
Run Code Online (Sandbox Code Playgroud)

这就是我开始流式传输工作的方式:

spark.readStream().format("mysource").load()
  .writeStream().format("mywriter").outputMode(OutputMode.Append()).start();
Run Code Online (Sandbox Code Playgroud)

一切正常,例如,如果我运行:

spark.readStream().format("mysource").load()
  .writeStream().format("console").outputMode(OutputMode.Append()).start();
Run Code Online (Sandbox Code Playgroud)

我无法分享数据编写者的完整代码。无论如何,我做了这样的事情:

class MySourceProvider extends DataSourceRegister with StreamWriteSupport {
  def createStreamWriter(queryId: String, schema: StructType, mode: OutputMode, options: DataSourceOptions): StreamWriter = {
    new MyStreamWriter(...)
  }
  def shortName(): String = {
    "mywriter"
  }
}

class MyStreamWriter(...) extends StreamWriter { 
  def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
  def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
  def createWriterFactory(): DataWriterFactory[Row] = {
    new MyDataWriterFactory()
  }
}
Run Code Online (Sandbox Code Playgroud)

vaq*_*han 7

您需要在代码中添加 checkpointLocation

\n\n
\n

选项(“检查点位置”,“/tmp/vaquarkhan/检查点”)。// <--\n 检查点目录

\n
\n\n

例子 :

\n\n
import org.apache.spark.sql.streaming.{OutputMode, Trigger}\nimport scala.concurrent.duration._\nval q = records.\n  writeStream.\n  format("console").\n  option("truncate", false).\n  option("checkpointLocation", "/tmp/vaquarkhan/checkpoint"). // <-- checkpoint directory\n  trigger(Trigger.ProcessingTime(10.seconds)).\n  outputMode(OutputMode.Update).\n  start\n
Run Code Online (Sandbox Code Playgroud)\n\n

针对您的问题有以下三种选择:

\n\n
\n

.option("startingOffsets", "latest") // 从流末尾读取数据\n

\n
\n\n
    \n
  • 最早的 \xe2\x80\x94 从流的开头开始读取。这不包括已从 Kafka 中删除的数据,因为它早于保留期(\xe2\x80\x9cged out\xe2\x80\x9d 数据)。

  • \n
  • latest \xe2\x80\x94 现在开始,仅处理查询开始后到达的新数据。

  • \n
  • 每个分区分配 \xe2\x80\x94 指定每个分区开始的精确偏移量,从而允许对处理应该开始的确切位置进行细粒度控制。例如,如果我们想准确地从其他系统或查询停止的地方继续,那么可以利用这个选项。

  • \n
\n\n

如果找不到检查点位置的目录名称,createQuery 会报告 AnalysisException。

\n\n
checkpointLocation must be specified either through option("checkpointLocation", ...) or SparkSession.conf.set("spark.sql.streaming.checkpointLocation", ...)\n
Run Code Online (Sandbox Code Playgroud)\n\n

以下是阿帕奇火花代码:

\n\n
  private def createQuery(\n      userSpecifiedName: Option[String],\n      userSpecifiedCheckpointLocation: Option[String],\n      df: DataFrame,\n      extraOptions: Map[String, String],\n      sink: BaseStreamingSink,\n      outputMode: OutputMode,\n      useTempCheckpointLocation: Boolean,\n      recoverFromCheckpointLocation: Boolean,\n      trigger: Trigger,\n      triggerClock: Clock): StreamingQueryWrapper = {\n    var deleteCheckpointOnStop = false\n    val checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified =>\n      new Path(userSpecified).toUri.toString\n    }.orElse {\n      df.sparkSession.sessionState.conf.checkpointLocation.map { location =>\n        new Path(location, userSpecifiedName.getOrElse(UUID.randomUUID().toString)).toUri.toString\n      }\n    }.getOrElse {\n      if (useTempCheckpointLocation) {\n        // Delete the temp checkpoint when a query is being stopped without errors.\n        deleteCheckpointOnStop = true\n        Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath\n      } else {\n        throw new AnalysisException(\n          "checkpointLocation must be specified either " +\n            """through option("checkpointLocation", ...) or """ +\n            s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", ...)""")\n      }\n    }\n
Run Code Online (Sandbox Code Playgroud)\n

  • 您好,我找到了答案:此行为是硬编码在 Spark DataStreamWriter start() 方法中的。感谢您的帮助。 (5认同)
  • 感谢您的回答。但我实际上想知道是否可以避免对 checkpointLocation 选项进行检查,因为似乎如果我使用控制台输出,则不需要。我至少试图理解这种行为背后的原因。 (2认同)