如何在 foreachBatch 中使用临时表?

Ked*_*dar 3 apache-spark spark-structured-streaming

我们正在构建一个流平台,在该平台中批量使用 SQL 至关重要。

val query = streamingDataSet.writeStream.option("checkpointLocation", checkPointLocation).foreachBatch { (df, batchId) => {

      df.createOrReplaceTempView("events")

      val df1 = ExecutionContext.getSparkSession.sql("select * from events")

      df1.limit(5).show()
      // More complex processing on dataframes

    }}.trigger(trigger).outputMode(outputMode).start()

query.awaitTermination()
Run Code Online (Sandbox Code Playgroud)

抛出的错误是:

org.apache.spark.sql.streaming.StreamingQueryException: Table or view not found: events
Caused by: org.apache.spark.sql.catalyst.analysis.NoSuchTableException: Table or view 'events' not found in database 'default';
Run Code Online (Sandbox Code Playgroud)

流媒体源是带有水印的 Kafka,无需使用 Spark-SQL,我们就能够执行数据帧转换。Spark版本是2.4.0,Scala是2.11.7。触发器是每 1 分钟一次的处理时间,输出模式是追加。

是否有其他方法可以促进在 foreachBatch 中使用 Spark-sql ?它可以与 Spark 的升级版本一起使用吗?在这种情况下我们应该升级到版本吗?请帮忙。谢谢。

Jac*_*ski 5

tl;dr替换ExecutionContext.getSparkSessiondf.sparkSession.


原因是StreamingQueryException流式查询尝试访问eventsa 中的临时表,SparkSession而 a 对此一无所知,即ExecutionContext.getSparkSession.

唯一注册SparkSessionevents临时表的正是在其中创建的数据帧,SparkSession即。dfdf.sparkSession