ast*_*asz 17 hive apache-spark
上下文
Spark 2.0.1,在集群模式下spark-submit.我正在读取hdfs的镶木地板文件:
val spark = SparkSession.builder
.appName("myApp")
.config("hive.metastore.uris", "thrift://XXX.XXX.net:9083")
.config("spark.sql.sources.bucketing.enabled", true)
.enableHiveSupport()
.getOrCreate()
val df = spark.read
.format("parquet")
.load("hdfs://XXX.XX.X.XX/myParquetFile")
Run Code Online (Sandbox Code Playgroud)
我保存df到50个桶的蜂巢表分组userid:
df0.write
.bucketBy(50, "userid")
.saveAsTable("myHiveTable")
Run Code Online (Sandbox Code Playgroud)
现在,当我查看hdfs的hive仓库时,/user/hive/warehouse有一个名为的文件夹myHiveTable.里面是一堆part-*.parquet文件.我希望有50个文件.但不,有3201个文件!!!! 每个分区有64个文件,为什么?对于我保存为hive表的不同文件,每个分区有不同数量的文件.所有文件都很小,每个只有几十Kb!
我要补充的,不同的,这个数字userid大约是1 000 000在myParquetFile.
题
为什么文件夹中有3201个文件而不是50个!这些是什么?
当我将此表读回DataFrame并打印分区数时:
val df2 = spark.sql("SELECT * FROM myHiveTable")
println(df2.rdd.getNumPartitions)
Run Code Online (Sandbox Code Playgroud)
分区数isIt正确50,我确认数据被正确分区userid.
对于我的一个大型数据集3Tb,我创建了一个包含1000个分区的表,这些分区创建了大约数百万个文件!这超出了目录项限制1048576并给出org.apache.hadoop.hdfs.protocol.FSLimitException$MaxDirectoryItemsExceededException
题
创建的文件数量取决于什么?
题
有没有办法限制创建的文件数量?
题
我应该担心这些文件吗?df2拥有所有这些文件会损害性能吗?总是说我们不应该创建太多分区,因为它有问题.
题
我发现这个信息HIVE动态分区提示文件的数量可能与映射器的数量有关.建议distribute by在插入蜂巢表时使用.我怎么能在Spark中做到这一点?
题
如果问题确实如上面的链接那样,这里如何在MapR-FS上插入数据后控制hive表的文件号,他们建议使用诸如hive.merge.mapfiles或者hive.merge.mapredfiles在map reduce工作之后合并所有小文件的选项.Spark中有这方面的选项吗?
Rav*_*mar 12
请使用spark sql,它将使用HiveContext将数据写入Hive表,因此它将使用您在表模式中配置的桶数.
SparkSession.builder().
config("hive.exec.dynamic.partition", "true").
config("hive.exec.dynamic.partition.mode", "nonstrict").
config("hive.execution.engine","tez").
config("hive.exec.max.dynamic.partitions","400").
config("hive.exec.max.dynamic.partitions.pernode","400").
config("hive.enforce.bucketing","true").
config("optimize.sort.dynamic.partitionining","true").
config("hive.vectorized.execution.enabled","true").
config("hive.enforce.sorting","true").
enableHiveSupport().getOrCreate()
spark.sql(s"insert into hiveTableName partition (partition_column) select * from myParquetFile")
Run Code Online (Sandbox Code Playgroud)
spark的分组实现不符合指定数量的桶大小.每个分区都写入一个单独的文件,因此您最终会为每个存储桶提供大量文件.
请参考此链接https://www.slideshare.net/databricks/hive-bucketing-in-apache-spark-with-tejas-patil
拉维
Bil*_*ang 10
我找到了一个解决方法(在Spark 2.1上).它解决了文件数量问题,但可能会产生一些性能影响.
dataframe
.withColumn("bucket", pmod(hash($"bucketColumn"), lit(numBuckets)))
.repartition(numBuckets, $"bucket")
.write
.format(fmt)
.bucketBy(numBuckets, "bucketColumn")
.sortBy("bucketColumn")
.option("path", "/path/to/your/table")
.saveAsTable("table_name")
Run Code Online (Sandbox Code Playgroud)
我认为spark的bucketing算法会对bucket列值的MurmurHash3进行正调整.这只是复制该逻辑并重新分区数据,以便每个分区包含存储桶的所有数据.
您可以使用分区+分组执行相同操作.
dataframe
.withColumn("bucket", pmod(hash($"bucketColumn"), lit(numBuckets)))
.repartition(numBuckets, $"partitionColumn", $"bucket")
.write
.format(fmt)
.partitionBy("partitionColumn")
.bucketBy(numBuckets, "bucketColumn")
.sortBy("bucketColumn")
.option("path", "/path/to/your/table")
.saveAsTable("table_name")
Run Code Online (Sandbox Code Playgroud)
使用csv格式在本地测试了3个分区和5个存储区(分区和存储区列都只是数字):
$ tree .
.
??? _SUCCESS
??? partitionColumn=0
? ??? bucket=0
? ? ??? part-00004-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00000.csv
? ??? bucket=1
? ? ??? part-00003-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00001.csv
? ??? bucket=2
? ? ??? part-00002-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00002.csv
? ??? bucket=3
? ? ??? part-00004-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00003.csv
? ??? bucket=4
? ??? part-00001-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00004.csv
??? partitionColumn=1
? ??? bucket=0
? ? ??? part-00002-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00000.csv
? ??? bucket=1
? ? ??? part-00004-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00001.csv
? ??? bucket=2
? ? ??? part-00002-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00002.csv
? ??? bucket=3
? ? ??? part-00001-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00003.csv
? ??? bucket=4
? ??? part-00003-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00004.csv
??? partitionColumn=2
??? bucket=0
? ??? part-00000-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00000.csv
??? bucket=1
? ??? part-00001-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00001.csv
??? bucket=2
? ??? part-00001-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00002.csv
??? bucket=3
? ??? part-00003-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00003.csv
??? bucket=4
??? part-00000-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00004.csv
Run Code Online (Sandbox Code Playgroud)
这是所有3个分区的bucket = 0(你可以看到它们都是相同的值):
$ paste partitionColumn=0/bucket=0/part-00004-5f860e5c-f2c2-4d52-8035-aa00e4432770_00000.csv partitionColumn=1/bucket=0/part-00002-5f860e5c-f2c2-4d52-8035-aa00e4432770_00000.csv partitionColumn=2/bucket=0/part-00000-5f860e5c-f2c2-4d52-8035-aa00e4432770_00000.csv | head
0 0 0
4 4 4
6 6 6
16 16 16
18 18 18
20 20 20
26 26 26
27 27 27
29 29 29
32 32 32
Run Code Online (Sandbox Code Playgroud)
我实际上喜欢额外的桶索引.但是如果你不这样做,你可以在写入之前删除存储桶列,你将获得每个分区的numBuckets文件数.
| 归档时间: |
|
| 查看次数: |
9816 次 |
| 最近记录: |