Spark结构化流-插入现有Hive表中,具有可扩展性,没有错误

Big*_*Guy 5 hive apache-spark spark-structured-streaming

这似乎是一件很简单的事情,您可能会认为Spark开发人员会以某种方式构建此功能,但我找不到。我见过或尝试过的选项是:

  1. 写入Parquet文件并创建外部配置单元表,但我想插入现有的配置单元内部表中。我知道我可以仅使用另一个Spark作业来定期添加此数据来添加此数据,但这并不理想

  2. 写一个ForeachWriter我尝试过但很少成功的习惯:

    val ds1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers", hostPort).option("subscribe", "test").load()
    val lines = ds1.selectExpr("CAST(value AS STRING)").as[String]
    val words = lines.flatMap(_.split(" "))
    
    import org.apache.spark.sql.ForeachWriter
    val writer = new ForeachWriter[String] {
      import org.apache.spark.sql.SparkSession
      override def open(partitionId: Long, version: Long) = true
      override def process(value: String) = {
          val sparksess = SparkSession.builder.master("local[*]").enableHiveSupport().getOrCreate()
          import sparksess.implicits._
          val valData = List(value).toDF
          valData.write.mode("append").insertInto("stream_test") 
          sparksess.close
          }
      override def close(errorOrNull: Throwable) = {}
    }
    val q = words.writeStream.queryName("words-app").foreach(writer)
    val query = q.start()
    
    Run Code Online (Sandbox Code Playgroud)

注意:在Open方法中创建Sparksession并在close方法中关闭会话似乎很明显,但是这些操作不止一次迭代,然后失败了

注意:上面的代码甚至在第一次迭代后都可以使用,但是抛出了Spark错误,使我不满意该解决方案可以扩展:

18/03/15 11:10:52 ERROR cluster.YarnScheduler: Lost executor 2 on hadoop08.il.nds.com: Container marked as failed: container_1519892135449_0122_01_000003 on host: hadoop08.il.nds.com. Exit status: 50. Diagnostics: Exception from container-launch.
Container id: container_1519892135449_0122_01_000003
Exit code: 50
Stack trace: ExitCodeException exitCode=50:
        at org.apache.hadoop.util.Shell.runCommand(Shell.java:601)
        at org.apache.hadoop.util.Shell.run(Shell.java:504)
        at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:786)
        at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:213)
        at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
        at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
        at java.util.concurrent.FutureTask.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)


Container exited with a non-zero exit code 50
Run Code Online (Sandbox Code Playgroud)

注意:有一个类似的问题问在这里:但答案并不表示它是怎么做的流上下文并没有回答

任何人都可以帮助修复此代码,以便更好地扩展而不会出现错误?

  1. 写回卡夫卡水槽,然后在使用KafkaConnect卡夫卡写蜂巢状它说,在这里:但是,我想避免这种额外的复杂性。

  2. 写给JDBCSink-这是我的下一个尝试,但似乎对于Spark中的Hive来说并不必要!

  3. 我读过DataBricks正在开发类似的东西-参见这里

但是尚不清楚何时以及是否将其作为标准Spark GA版本的一部分发布。

我们特别感谢Spark开发人员有关将流数据写入现有Hive表的最佳方法的任何帮助。

编辑

我写了一个JDBC Sink扩展了ForeachWriter,它可以工作,但是它非常慢,因为它必须为每一行打开和关闭一个连接!我想尝试创建一个连接池,将重新使用和释放连接,但看,这是不可能的ForeachWriter和我会写我自己的自定义JDBC片要做到这一点,它是复杂的这里

我正在计划2个选项之一。为了简单起见,直到Spark发布内置解决方案之前,将数据写到配置单元外部表可访问的镶木文件中,并定期写入作业以将数据移动到托管表中。2.使用较旧的Spark Streaming DStream,它似乎可以使用ForeachRDD懒惰地创建可以重用的单例会话,但是我认为我将失去结构化流的“恰好一次”功能,并且在这种情况下必须完全重写我的代码功能内置在结构化流中。