我正在尝试利用spark分区.我试图做类似的事情
data.write.partitionBy("key").parquet("/location")
这里的问题每个分区都会产生大量的镶木地板文件,如果我尝试从根目录中读取,会导致读取速度慢.
为了避免我试过
data.coalese(numPart).write.partitionBy("key").parquet("/location")
但是,这会在每个分区中创建numPart数量的镶木地板文件.现在我的分区大小不同了.所以我理想的是希望每个分区有单独的合并.然而,这看起来并不容易.我需要访问所有分区合并到一定数量并存储在一个单独的位置.
写入后我应该如何使用分区来避免许多文件?
INPUT:
输入数据集包含多个文件中的 1000 万笔交易,以镶木地板形式存储。包括所有文件在内的整个数据集的大小范围为 6 到 8GB。
问题陈述:
根据客户 ID 对交易进行分区,这将为每个客户 ID 创建一个文件夹,每个文件夹包含该特定客户完成的所有交易。
HDFS 对根目录中可以创建的子目录数量有 640 万个硬性限制,因此使用客户 ID 的最后两位数字(范围从 00、01、02...到 99)来创建顶级目录和每个顶级目录将包含所有以该特定两位数字结尾的客户 ID。
示例输出目录结构:
00/cust_id=100900/part1.csv 
00/cust_id=100800/part33.csv 
01/cust_id=100801/part1.csv 
03/cust_id=100803/part1.csv
代码:
// Reading input file and storing in cache
val parquetReader = sparksession.read
  .parquet("/inputs")
  .persist(StorageLevel.MEMORY_ONLY) //No spill will occur has enough memory
// Logic to partition
var customerIdEndingPattern = 0
while (cardAccountEndingPattern < 100) {
  var idEndPattern = customerIdEndingPattern + ""
  if (customerIdEndingPattern < 10) {
    idEndPattern = "0" + customerIdEndingPattern
  }
  parquetReader …partitioning hadoop-partitioning apache-spark hadoop2 apache-spark-sql
我有一些分区的配置单元表,它们指向镶木地板文件。现在每个分区都有很多小的镶木地板文件,每个大小约为 5kb,我想将这些小文件合并为每个分区的一个大文件。我怎样才能做到这一点来提高我的蜂巢性能?我尝试将分区中的所有镶木地板文件读取到 pyspark 数据帧,并将组合数据帧重写到同一分区并删除旧的。但出于某种原因,这对我来说似乎效率低下或初学者级别的类型。这样做的利弊是什么?而且,如果有任何其他方法,请指导我在 spark 或 pyspark 中实现它。