将单个DStream拆分为多个Hive表

luc*_*ucy 6 optimization apache-kafka apache-spark spark-streaming

我正在研究Kafka Spark流媒体项目.Spark流式传输从Kafka获取数据.数据采用json格式.样本输入

{"table":"tableA","Product_ID":"AGSVGF.upf","file_timestamp":"2018-07-26T18:58:08.4485558Z000000000000000","hdfs_file_name":"null_1532631600050","Date_Time":"2018 -07-26T13:45:01.0000000Z","User_Name":"UBAHTSD"}

{"table":"tableB","Test_ID":"FAGS.upf","timestamp":"2018-07-26T18:58:08.4485558Z000000000000000","name":"flink","time":"2018 -07-26T13:45:01.0000000Z","Id":"UBAHTGADSGSCVDGHASD"}

一个JSON字符串是一条消息.有15种类型的JSON字符串区分使用表列.现在我想在Apache Hive中保存这15个不同的JSON.所以我创建了一个dstream,在表格列的基础上,我已经过滤了rdd并保存到Hive中.代码工作正常.但是有些时候很多时候会花很多时间来批量生产.我使用控制输入spark.streaming.kafka.maxRatePerPartition=10.我已将rdd重新划分为9分区,但在Spark UI上,它显示未知的阶段. 在此输入图像描述

这是我的代码.

val dStream = dataStream.transform(rdd => rdd.repartition(9)).map(_._2)
dStream.foreachRDD { rdd =>
    if (!rdd.isEmpty()) {
      val sparkContext = rdd.sparkContext
      rdd.persist(StorageLevel.MEMORY_AND_DISK)
      val hiveContext = getInstance(sparkContext)
          val tableA = rdd.filter(_.contains("tableA"))
          if (!tableA.isEmpty()) {
            HiveUtil.tableA(hiveContext.read.json(tableA))
            tableA.unpersist(true)
          }

          val tableB = rdd.filter(_.contains("tableB"))
          if (!tableB.isEmpty()) {
            HiveUtil.tableB(hiveContext.read.json(tableB))
            tableB.unpersist(true)
          }
          .....
          .... upto 15 tables
          ....

            val tableK = rdd.filter(_.contains("tableK"))
              if (!tableB.isEmpty()) {
                HiveUtil.tableB(hiveContext.read.json(tableK))
                tableB.unpersist(true)
              }

    }

}
Run Code Online (Sandbox Code Playgroud)

我如何优化代码?

谢谢.

cri*_*007 1

纯粹从管理角度来看,我建议您参数化您的作业以接受表名称,然后运行 ​​15 个单独的 Spark 应用程序。还要确保每个应用程序的kafka消费者组都是不同的

通过这种方式,您可以更轻松地监控哪个 Spark 作业的执行效果不如其他作业,并且一个表中的数据倾斜不会导致其他作业出现问题。

目前尚不清楚 Kafka 消息键是什么,但如果使用表作为键生成,那么 Spark 可以与 kafka 分区一起扩展,并且可以保证每个表的所有消息都按顺序排列。

总的来说,我实际上会使用 Kafka Connect 或 Streamsets 写入 HDFS/Hive,而不必编写代码或调整 Spark 设置