小编Big*_*Guy的帖子

Spark结构化流-插入现有Hive表中,具有可扩展性,没有错误

这似乎是一件很简单的事情,您可能会认为Spark开发人员会以某种方式构建此功能,但我找不到。我见过或尝试过的选项是:

  1. 写入Parquet文件并创建外部配置单元表,但我想插入现有的配置单元内部表中。我知道我可以仅使用另一个Spark作业来定期添加此数据来添加此数据,但这并不理想

  2. 写一个ForeachWriter我尝试过但很少成功的习惯:

    val ds1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers", hostPort).option("subscribe", "test").load()
    val lines = ds1.selectExpr("CAST(value AS STRING)").as[String]
    val words = lines.flatMap(_.split(" "))
    
    import org.apache.spark.sql.ForeachWriter
    val writer = new ForeachWriter[String] {
      import org.apache.spark.sql.SparkSession
      override def open(partitionId: Long, version: Long) = true
      override def process(value: String) = {
          val sparksess = SparkSession.builder.master("local[*]").enableHiveSupport().getOrCreate()
          import sparksess.implicits._
          val valData = List(value).toDF
          valData.write.mode("append").insertInto("stream_test") 
          sparksess.close
          }
      override def close(errorOrNull: Throwable) = {}
    }
    val q = words.writeStream.queryName("words-app").foreach(writer)
    val query = q.start()
    
    Run Code Online (Sandbox Code Playgroud)

注意:在Open方法中创建Sparksession并在close方法中关闭会话似乎很明显,但是这些操作不止一次迭代,然后失败了

注意:上面的代码甚至在第一次迭代后都可以使用,但是抛出了Spark错误,使我不满意该解决方案可以扩展:

18/03/15 11:10:52 ERROR cluster.YarnScheduler: …
Run Code Online (Sandbox Code Playgroud)

hive apache-spark spark-structured-streaming

5
推荐指数
0
解决办法
1283
查看次数