将Spark数据帧保存为Hive中的动态分区表

Che*_*lal 30 hadoop hive apache-spark apache-spark-sql spark-dataframe

我有一个示例应用程序正在从csv文件读取数据帧.可以使用该方法将数据帧以镶木地板格式存储到Hive表中 df.saveAsTable(tablename,mode).

上面的代码工作正常,但我每天都有如此多的数据,我想根据creationdate(表中的列)动态分区hive表.

有没有办法动态分区数据帧并将其存储到配置单元仓库.想要避免使用硬编码插入语句hivesqlcontext.sql(insert into table partittioin by(date)....).

问题可以视为以下内容的扩展:如何将DataFrame直接保存到Hive?

任何帮助深表感谢.

小智 34

我能够使用分区的hive表写入 df.write().mode(SaveMode.Append).partitionBy("colname").saveAsTable("Table")

我必须启用以下属性才能使其正常工作.

hiveContext.setConf("hive.exec.dynamic.partition", "true")
hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")

  • @VrushankDoshi您可以在创建hiveContext后立即在spark程序中设置它.val sparkConf = new SparkConf()val sc = new SparkContext(sparkConf)val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)hiveContext.setConf("hive.exec.dynamic.partition","true" )hiveContext.setConf("hive.exec.dynamic.partition.mode","nonstrict") (2认同)

mdu*_*ant 31

我相信它的工作原理如下:

df 是包含年,月和其他列的数据框

df.write.partitionBy('year', 'month').saveAsTable(...)
Run Code Online (Sandbox Code Playgroud)

要么

df.write.partitionBy('year', 'month').insertInto(...)
Run Code Online (Sandbox Code Playgroud)

  • 好的,所以能够用1.4版本来解决它..df.write()模式(SaveMode.Append).partitionBy( "日期")saveAsTable( "表名"); .但是,这会将我的日期字段更改为整数值并删除实际日期.例如,列中有9个唯一日期,但它们现在存储为1,2,3 ....文件夹名称为date = 1,2,3,...而不是date = 20141121.如果有办法,请告诉我. (6认同)
  • @subramaniam-ramasubramanian:请回复 OP 的问题作为答案,而不是编辑现有答案 (2认同)

小智 7

我也面对同样的事情,但使用了我解决的以下技巧.

  1. 当我们将任何表分区为分区时,分区列将区分大小写.

  2. 分区列应存在于具有相同名称的DataFrame中(区分大小写).码:

    var dbName="your database name"
    var finaltable="your table name"
    
    // First check if table is available or not..
    if (sparkSession.sql("show tables in " + dbName).filter("tableName='" +finaltable + "'").collect().length == 0) {
         //If table is not available then it will create for you..
         println("Table Not Present \n  Creating table " + finaltable)
         sparkSession.sql("use Database_Name")
         sparkSession.sql("SET hive.exec.dynamic.partition = true")
         sparkSession.sql("SET hive.exec.dynamic.partition.mode = nonstrict ")
         sparkSession.sql("SET hive.exec.max.dynamic.partitions.pernode = 400")
         sparkSession.sql("create table " + dbName +"." + finaltable + "(EMP_ID        string,EMP_Name          string,EMP_Address               string,EMP_Salary    bigint)  PARTITIONED BY (EMP_DEP STRING)")
         //Table is created now insert the DataFrame in append Mode
         df.write.mode(SaveMode.Append).insertInto(empDB + "." + finaltable)
    }
    
    Run Code Online (Sandbox Code Playgroud)


wie*_*u_p 5

可以SparkSession这样配置:

spark = SparkSession \
    .builder \
    ...
    .config("spark.hadoop.hive.exec.dynamic.partition", "true") \
    .config("spark.hadoop.hive.exec.dynamic.partition.mode", "nonstrict") \
    .enableHiveSupport() \
    .getOrCreate()
Run Code Online (Sandbox Code Playgroud)

或者您可以将它们添加到 .properties 文件中

spark.hadoopSpark 配置需要该前缀(至少在 2.4 中),以下是 Spark 设置此配置的方式:

  /**
   * Appends spark.hadoop.* configurations from a [[SparkConf]] to a Hadoop
   * configuration without the spark.hadoop. prefix.
   */
  def appendSparkHadoopConfigs(conf: SparkConf, hadoopConf: Configuration): Unit = {
    SparkHadoopUtil.appendSparkHadoopConfigs(conf, hadoopConf)
  }
Run Code Online (Sandbox Code Playgroud)