如何将Spark结构化流式DataFrame插入到Hive外部表/位置?

Big*_*igD 7 hive apache-spark spark-structured-streaming

关于与HIVE表的spark结构化流式集成的一个查询.

我试图做一些火花结构流的例子.

这是我的榜样

 val spark =SparkSession.builder().appName("StatsAnalyzer")
     .enableHiveSupport()
     .config("hive.exec.dynamic.partition", "true")
     .config("hive.exec.dynamic.partition.mode", "nonstrict")
     .config("spark.sql.streaming.checkpointLocation", "hdfs://pp/apps/hive/warehouse/ab.db")
     .getOrCreate()

 // Register the dataframe as a Hive table

 val userSchema = new StructType().add("name", "string").add("age", "integer")
 val csvDF = spark.readStream.option("sep", ",").schema(userSchema).csv("file:///home/su/testdelta") 
 csvDF.createOrReplaceTempView("updates")
 val query= spark.sql("insert into table_abcd select * from updates")

 query.writeStream.start()
Run Code Online (Sandbox Code Playgroud)

正如您在将数据帧写入hdfs位置时的最后一步所看到的那样,数据未插入到令人兴奋的目录中(我的现有目录中有一些旧数据被"age"分区).

我正进入(状态

spark.sql.AnalysisException:必须使用writeStream start()执行带有流源的查询

你能帮我解释为什么我无法将数据插入到hdfs位置的现有目录中吗?或者有没有其他方法可以在蜂巢表上"插入"操作?

寻找解决方案

Jac*_*ski 9

Spark Structured Streaming 不支持将流式查询的结果写入 Hive 表。

scala> println(spark.version)
2.4.0

val sq = spark.readStream.format("rate").load
scala> :type sq
org.apache.spark.sql.DataFrame

scala> assert(sq.isStreaming)

scala> sq.writeStream.format("hive").start
org.apache.spark.sql.AnalysisException: Hive data source can only be used with tables, you can not write files of Hive data source directly.;
  at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:246)
  ... 49 elided
Run Code Online (Sandbox Code Playgroud)

如果不支持目标系统(又名sink),您可以使用foreach 和 foreachBatch 操作(突出显示我的):

foreachforeachBatch操作让您应用在流媒体查询的输出任意操作和写作逻辑。它们的用例略有不同——虽然foreach允许在每一行上自定义写入逻辑,但foreachBatch允许对每个微批次的输出进行任意操作和自定义逻辑。

我认为foreachBatch是你最好的选择。

import org.apache.spark.sql.DataFrame
sq.writeStream.foreachBatch { case (ds: DataFrame, batchId: Long) =>
  // do whatever you want with your input DataFrame
  // incl. writing to Hive
  // I simply decided to print out the rows to the console
  ds.show
}.start
Run Code Online (Sandbox Code Playgroud)

还有一个我从未使用过的Apache Hive 仓库连接器,但似乎它可能会有所帮助。