Waq*_*qas 3 apache-spark pyspark
我有大约 12K 二进制文件,每个文件大小为 100mb,包含多个长度可变的压缩记录。我试图找到最有效的方法来读取它们,解压缩,然后以镶木地板格式写回。我的集群有 6 个节点,每个节点有 4 个核心。
此时,使用下面的伪代码,读取所有文件大约需要 8 小时,并且写回 parquet 非常非常慢。
def reader(file_name):
keyMsgList = []
with open(file_name, "rb") as f:
while True:
header = f.read(12)
if not header:
break
keyBytes = header[0:8]
msgLenBytes = header[8:12]
# conver keyBytes & msgLenBytes to int
message = f.read(msgLen)
keyMsgList.append((key, decode(message)))
return keyMsgList
files = os.listdir("/path/to/binary/files")
rddFiles = sc.parallelize(files, 6000)
df = spark.createDataFrame(rddFiles.flatMap(reader), schema)
df.repartition(6000).write.mode("append").partitionBy("key").parquet("/directory")
Run Code Online (Sandbox Code Playgroud)
这里选择 6000 的原因sc.parallelize(files, 6000)是创建每个大小为 200 MB 的分区,即(12k files * 100mb size) / 200MB。由于文件内容的顺序性质需要逐字节读取每个文件内容,不确定读取是否可以进一步优化?同样,当写回parquet时,输入的数字repartition(6000)是为了确保数据均匀分布并且所有执行器可以并行写入。然而,事实证明这是一个非常缓慢的操作。
一种解决方案是增加执行器的数量,这会提高读取性能,但不确定是否会提高写入性能?
正在寻找有关如何提高此处性能的任何建议?
repartitionbut coalesce。看这里。您确定了操作的瓶颈repartition,这是因为您启动了全洗牌。有coalesce你在,就不会这么做。您最终也会得到N 个分区。它们不会像您所得到的那样平衡,repartition但这有关系吗?
我建议你赞成coalesce而不是repartition
您的应用程序运行在 6 个节点、4 个核心上。您有 6000 个分区。这意味着您有大约 250 个核心分区(甚至没有计算给您的 master 的分区)。在我看来,这太过分了。
由于您的分区很小(大约 200Mb),您的 master 可能会花费更多时间等待执行器的应答,而不是执行查询。
我建议你减少分区数量
DataFrame API 操作通常比手动编码的解决方案更快、更好。
也许看看pyspark.sql.functions你是否能在那里找到一些东西(参见这里)。我不知道这是否相关,因为我没有看到你的数据,但这是我根据我的经验所做的一般建议。
| 归档时间: |
|
| 查看次数: |
12288 次 |
| 最近记录: |