我正在编写一个库来将 Apache Spark 与自定义环境集成。我正在实现自定义流媒体源和流媒体作家。
我正在开发的一些源是不可恢复的,至少在应用程序崩溃之后是这样。如果应用程序重新启动,则需要重新加载所有数据。因此,我们希望避免用户必须明确设置 'checkpointLocation' 选项。但是如果没有提供该选项,我们会看到以下错误:
org.apache.spark.sql.AnalysisException: checkpointLocation must be specified either through option("checkpointLocation", ...) or SparkSession.conf.set("spark.sql.streaming.checkpointLocation", ...);
Run Code Online (Sandbox Code Playgroud)
但是,如果我使用控制台流输出,一切正常。
有没有办法获得相同的行为?
注意:我们为流读取器/写入器使用 Spark v2 接口。
火花日志:
18/06/29 16:36:48 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/C:/mydir/spark-warehouse/').
18/06/29 16:36:48 INFO SharedState: Warehouse path is 'file:/C:/mydir/spark-warehouse/'.
18/06/29 16:36:48 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
org.apache.spark.sql.AnalysisException: checkpointLocation must be specified either through option("checkpointLocation", ...) or SparkSession.conf.set("spark.sql.streaming.checkpointLocation", ...);
at org.apache.spark.sql.streaming.StreamingQueryManager$$anonfun$3.apply(StreamingQueryManager.scala:213)
at org.apache.spark.sql.streaming.StreamingQueryManager$$anonfun$3.apply(StreamingQueryManager.scala:208)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:207)
at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:299)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:296)
...
18/06/29 …Run Code Online (Sandbox Code Playgroud) java scala apache-spark spark-streaming spark-structured-streaming