Big*_*igD 7 hive apache-spark spark-structured-streaming
关于与HIVE表的spark结构化流式集成的一个查询.
我试图做一些火花结构流的例子.
这是我的榜样
 val spark =SparkSession.builder().appName("StatsAnalyzer")
     .enableHiveSupport()
     .config("hive.exec.dynamic.partition", "true")
     .config("hive.exec.dynamic.partition.mode", "nonstrict")
     .config("spark.sql.streaming.checkpointLocation", "hdfs://pp/apps/hive/warehouse/ab.db")
     .getOrCreate()
 // Register the dataframe as a Hive table
 val userSchema = new StructType().add("name", "string").add("age", "integer")
 val csvDF = spark.readStream.option("sep", ",").schema(userSchema).csv("file:///home/su/testdelta") 
 csvDF.createOrReplaceTempView("updates")
 val query= spark.sql("insert into table_abcd select * from updates")
 query.writeStream.start()
Run Code Online (Sandbox Code Playgroud)
正如您在将数据帧写入hdfs位置时的最后一步所看到的那样,数据未插入到令人兴奋的目录中(我的现有目录中有一些旧数据被"age"分区).
我正进入(状态
spark.sql.AnalysisException:必须使用writeStream start()执行带有流源的查询
你能帮我解释为什么我无法将数据插入到hdfs位置的现有目录中吗?或者有没有其他方法可以在蜂巢表上"插入"操作?
寻找解决方案
Spark Structured Streaming 不支持将流式查询的结果写入 Hive 表。
scala> println(spark.version)
2.4.0
val sq = spark.readStream.format("rate").load
scala> :type sq
org.apache.spark.sql.DataFrame
scala> assert(sq.isStreaming)
scala> sq.writeStream.format("hive").start
org.apache.spark.sql.AnalysisException: Hive data source can only be used with tables, you can not write files of Hive data source directly.;
  at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:246)
  ... 49 elided
Run Code Online (Sandbox Code Playgroud)
如果不支持目标系统(又名sink),您可以使用foreach 和 foreachBatch 操作(突出显示我的):
在
foreach与foreachBatch操作让您应用在流媒体查询的输出任意操作和写作逻辑。它们的用例略有不同——虽然foreach允许在每一行上自定义写入逻辑,但foreachBatch允许对每个微批次的输出进行任意操作和自定义逻辑。
我认为foreachBatch是你最好的选择。
import org.apache.spark.sql.DataFrame
sq.writeStream.foreachBatch { case (ds: DataFrame, batchId: Long) =>
  // do whatever you want with your input DataFrame
  // incl. writing to Hive
  // I simply decided to print out the rows to the console
  ds.show
}.start
Run Code Online (Sandbox Code Playgroud)
还有一个我从未使用过的Apache Hive 仓库连接器,但似乎它可能会有所帮助。