jav*_*dba 5 scala apache-spark parquet
当写入 a dataframeto parquet使用时partitionBy:
df.write.partitionBy("col1","col2","col3").parquet(path)
Run Code Online (Sandbox Code Playgroud)
我期望正在写入的每个分区都是由单独的任务独立完成的,并且与分配给当前 Spark 作业的工作人员数量并行。
然而,在写入镶木地板时,实际上一次只有一个工作程序/任务在运行。该工作人员循环遍历每个分区并串行写出文件.parquet。为什么会出现这种情况 - 有没有办法强制执行此spark.write.parquet操作并发?
下面的不是我想看到的(应该是700%+..)
从另一篇文章中我也尝试repartition在前面添加
df.repartition("col1","col2","col3").write.partitionBy("col1","col2","col3").parquet(path)
Run Code Online (Sandbox Code Playgroud)
不幸的是,这没有任何效果:仍然只有一名工人..
local注意:我在模式下运行,local[8]并且看到其他Spark 操作与多达 8 个并发工作线程一起运行,并且使用了高达 750% 的 cpu。
简而言之,从单个任务写入多个输出文件不是并行的,但假设您有多个任务(多个输入拆分),每个任务都会在工作线程上获得自己的核心。
写出分区数据的目的不是并行化您的写入操作。Spark 已经通过同时写出多个任务来做到这一点。目标只是优化未来的读取操作,您只需要保存数据的一个分区。
Spark 中写入分区的逻辑旨在将前一阶段的所有记录写入目的地时仅读取一次。我相信设计选择的一部分也是为了防止分区键具有许多值的情况。
编辑:Spark 2.x 方法
在 Spark 2.x 中,它按分区键对每个任务中的记录进行排序,然后迭代它们一次写入一个文件句柄。我认为他们这样做是为了确保如果分区键中有很多不同的值,他们永远不会打开大量文件句柄。
作为参考,排序如下:
向下滚动一点,您将看到它write(iter.next())在每一行中循环调用。
这是实际的写入(一次一个文件/分区键):
在那里您可以看到它一次只打开一个文件句柄。
编辑:Spark 1.x 方法
Spark 1.x 对于给定的任务所做的是循环遍历所有记录,每当遇到该任务之前未见过的新输出分区时,都会打开一个新的文件句柄。然后它立即将记录写入该文件句柄并转到下一个。这意味着在处理单个任务时的任何给定时间,最多可以为该任务打开 N 个文件句柄,其中 N 是输出分区的最大数量。为了更清楚地说明这一点,这里有一些 python 伪代码来展示总体思路:
# To write out records in a single InputSplit/Task
handles = {}
for row in input_split:
partition_path = determine_output_path(row, partition_keys)
if partition_path not in handles:
handles[partition_path] = open(partition_path, 'w')
handles[partition_path].write(row)
Run Code Online (Sandbox Code Playgroud)
上述写出记录的策略有一个警告。在 Spark 1.x 中,该设置spark.sql.sources.maxConcurrentWrites对每个任务可以打开的掩码文件句柄设置了上限。达到这一目标后,Spark 会按分区键对数据进行排序,这样它就可以迭代一次写出一个文件的记录。