ast*_*asz 17 hive apache-spark
上下文
Spark 2.0.1,在集群模式下spark-submit.我正在读取hdfs的镶木地板文件:
val spark = SparkSession.builder
      .appName("myApp")
      .config("hive.metastore.uris", "thrift://XXX.XXX.net:9083")
      .config("spark.sql.sources.bucketing.enabled", true)
      .enableHiveSupport()
      .getOrCreate()
val df = spark.read
              .format("parquet")
              .load("hdfs://XXX.XX.X.XX/myParquetFile")
我保存df到50个桶的蜂巢表分组userid:
df0.write
   .bucketBy(50, "userid")
   .saveAsTable("myHiveTable")
现在,当我查看hdfs的hive仓库时,/user/hive/warehouse有一个名为的文件夹myHiveTable.里面是一堆part-*.parquet文件.我希望有50个文件.但不,有3201个文件!!!! 每个分区有64个文件,为什么?对于我保存为hive表的不同文件,每个分区有不同数量的文件.所有文件都很小,每个只有几十Kb!
我要补充的,不同的,这个数字userid大约是1 000 000在myParquetFile.
题
为什么文件夹中有3201个文件而不是50个!这些是什么?
当我将此表读回DataFrame并打印分区数时:
val df2 = spark.sql("SELECT * FROM myHiveTable") 
println(df2.rdd.getNumPartitions)
分区数isIt正确50,我确认数据被正确分区userid.
对于我的一个大型数据集3Tb,我创建了一个包含1000个分区的表,这些分区创建了大约数百万个文件!这超出了目录项限制1048576并给出org.apache.hadoop.hdfs.protocol.FSLimitException$MaxDirectoryItemsExceededException 
题
创建的文件数量取决于什么?
题
有没有办法限制创建的文件数量?
题
我应该担心这些文件吗?df2拥有所有这些文件会损害性能吗?总是说我们不应该创建太多分区,因为它有问题.
题
我发现这个信息HIVE动态分区提示文件的数量可能与映射器的数量有关.建议distribute by在插入蜂巢表时使用.我怎么能在Spark中做到这一点?
题
如果问题确实如上面的链接那样,这里如何在MapR-FS上插入数据后控制hive表的文件号,他们建议使用诸如hive.merge.mapfiles或者hive.merge.mapredfiles在map reduce工作之后合并所有小文件的选项.Spark中有这方面的选项吗?
Rav*_*mar 12
请使用spark sql,它将使用HiveContext将数据写入Hive表,因此它将使用您在表模式中配置的桶数.
 SparkSession.builder().
  config("hive.exec.dynamic.partition", "true").
  config("hive.exec.dynamic.partition.mode", "nonstrict").
  config("hive.execution.engine","tez").
  config("hive.exec.max.dynamic.partitions","400").
  config("hive.exec.max.dynamic.partitions.pernode","400").
  config("hive.enforce.bucketing","true").
  config("optimize.sort.dynamic.partitionining","true").
  config("hive.vectorized.execution.enabled","true").
  config("hive.enforce.sorting","true").
  enableHiveSupport().getOrCreate()
spark.sql(s"insert into hiveTableName partition (partition_column) select * from  myParquetFile")
spark的分组实现不符合指定数量的桶大小.每个分区都写入一个单独的文件,因此您最终会为每个存储桶提供大量文件.
请参考此链接https://www.slideshare.net/databricks/hive-bucketing-in-apache-spark-with-tejas-patil
拉维
Bil*_*ang 10
我找到了一个解决方法(在Spark 2.1上).它解决了文件数量问题,但可能会产生一些性能影响.
dataframe
  .withColumn("bucket", pmod(hash($"bucketColumn"), lit(numBuckets)))
  .repartition(numBuckets, $"bucket")
  .write
  .format(fmt)
  .bucketBy(numBuckets, "bucketColumn")
  .sortBy("bucketColumn")
  .option("path", "/path/to/your/table")
  .saveAsTable("table_name")
我认为spark的bucketing算法会对bucket列值的MurmurHash3进行正调整.这只是复制该逻辑并重新分区数据,以便每个分区包含存储桶的所有数据.
您可以使用分区+分组执行相同操作.
dataframe
  .withColumn("bucket", pmod(hash($"bucketColumn"), lit(numBuckets)))
  .repartition(numBuckets, $"partitionColumn", $"bucket")
  .write
  .format(fmt)
  .partitionBy("partitionColumn")
  .bucketBy(numBuckets, "bucketColumn")
  .sortBy("bucketColumn")
  .option("path", "/path/to/your/table")
  .saveAsTable("table_name")
使用csv格式在本地测试了3个分区和5个存储区(分区和存储区列都只是数字):
$ tree .
.
??? _SUCCESS
??? partitionColumn=0
?   ??? bucket=0
?   ?   ??? part-00004-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00000.csv
?   ??? bucket=1
?   ?   ??? part-00003-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00001.csv
?   ??? bucket=2
?   ?   ??? part-00002-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00002.csv
?   ??? bucket=3
?   ?   ??? part-00004-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00003.csv
?   ??? bucket=4
?       ??? part-00001-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00004.csv
??? partitionColumn=1
?   ??? bucket=0
?   ?   ??? part-00002-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00000.csv
?   ??? bucket=1
?   ?   ??? part-00004-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00001.csv
?   ??? bucket=2
?   ?   ??? part-00002-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00002.csv
?   ??? bucket=3
?   ?   ??? part-00001-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00003.csv
?   ??? bucket=4
?       ??? part-00003-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00004.csv
??? partitionColumn=2
    ??? bucket=0
    ?   ??? part-00000-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00000.csv
    ??? bucket=1
    ?   ??? part-00001-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00001.csv
    ??? bucket=2
    ?   ??? part-00001-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00002.csv
    ??? bucket=3
    ?   ??? part-00003-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00003.csv
    ??? bucket=4
        ??? part-00000-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00004.csv
这是所有3个分区的bucket = 0(你可以看到它们都是相同的值):
$ paste partitionColumn=0/bucket=0/part-00004-5f860e5c-f2c2-4d52-8035-aa00e4432770_00000.csv partitionColumn=1/bucket=0/part-00002-5f860e5c-f2c2-4d52-8035-aa00e4432770_00000.csv partitionColumn=2/bucket=0/part-00000-5f860e5c-f2c2-4d52-8035-aa00e4432770_00000.csv | head
0   0   0
4   4   4
6   6   6
16  16  16
18  18  18
20  20  20
26  26  26
27  27  27
29  29  29
32  32  32
我实际上喜欢额外的桶索引.但是如果你不这样做,你可以在写入之前删除存储桶列,你将获得每个分区的numBuckets文件数.
| 归档时间: | 
 | 
| 查看次数: | 9816 次 | 
| 最近记录: |