是否可以从查询执行期间抛出的异常中自动恢复?
上下文:我正在开发一个 Spark 应用程序,它从 Kafka 主题读取数据、处理数据并输出到 S3。但是,在生产环境中运行几天后,spark 应用程序面临来自 S3 的一些网络故障,导致抛出异常并停止应用程序。还值得一提的是,该应用程序使用GCP 的 Spark k8s Operator在 Kubernetes 上运行。
从我目前看到的情况来看,这些异常很小,只需重新启动应用程序即可解决问题。我们可以处理这些异常并自动重新启动结构化流查询吗?
下面是一个抛出异常的例子:
Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: Job aborted.
=== Streaming Query ===
Identifier: ...
Current Committed Offsets: ...
Current Available Offsets: ...
Current State: ACTIVE
Thread State: RUNNABLE
Logical Plan: ...
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:297)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
Caused by: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) …Run Code Online (Sandbox Code Playgroud)