Spark Structured Streaming 异常处理

Has*_*air 2 spark-structured-streaming

我使用 Spark Structured Streaming API 从 MQTT 流媒体源读取数据。

val lines:= spark.readStream
  .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
  .option("topic", "Employee")
  .option("username", "username")
  .option("password", "passwork")
  .option("clientId", "employee11")
  .load("tcp://localhost:8000").as[(String, Timestamp)]
Run Code Online (Sandbox Code Playgroud)

我将流数据转换为案例类 Employee

case class Employee(Name: String, Department: String)    
val ds = lines.map {
        row =>
          implicit val format = DefaultFormats
          parse(row._1).extract[Employee]
      }
  ....some transformations
 df.writeStream
        .outputMode("append")
        .format("es")
        .option("es.resource", "spark/employee")
        .option("es.nodes", "localhost")
        .option("es.port", 9200)
        .start()
        .awaitTermination()
Run Code Online (Sandbox Code Playgroud)

现在队列中有一些消息与Employee案例类具有不同的结构。假设缺少一些必需的列。我的流媒体作业因未找到字段异常而失败。

现在我想处理这样的异常,也想发送一个警报通知。我尝试放置一个 try/catch 块。

case class ErrorMessage(row: String)        
catch {

      case e: Exception =>
        val ds = lines.map {
          row =>
            implicit val format = DefaultFormats
            parse(row._1).extract[ErrorMessage]
        }
        val error = lines.foreach(row => {
          sendErrorMail(row._1)
        })
    }
  }
Run Code Online (Sandbox Code Playgroud)

有一个例外,Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();; mqtt 任何对此的帮助将不胜感激。

Tom*_*los 5

我认为您应该使用Spark 流文档中start()描述的方法的返回对象。就像是:

val query = df.writeStream. ... .start()
try {
   //If the query has terminated with an exception, then the exception will be thrown.
   query.awaitTermination()
catch {
   case ex: Exception => /*code to send mail*/
}
Run Code Online (Sandbox Code Playgroud)

实现您自己的 foreach 接收器可能会导致频繁打开和关闭连接的开销。