优化Spark读写性能

Waq*_*qas 3 apache-spark pyspark

我有大约 12K 二进制文件,每个文件大小为 100mb,包含多个长度可变的压缩记录。我试图找到最有效的方法来读取它们,解压缩,然后以镶木地板格式写回。我的集群有 6 个节点,每个节点有 4 个核心。

此时,使用下面的伪代码,读取所有文件大约需要 8 小时,并且写回 parquet 非常非常慢。

def reader(file_name):
    keyMsgList = []
    with open(file_name, "rb") as f:
        while True:
            header = f.read(12)
            if not header:
                break
            keyBytes = header[0:8]
            msgLenBytes = header[8:12]

            # conver keyBytes & msgLenBytes to int 
            message = f.read(msgLen)
            keyMsgList.append((key, decode(message)))
    return keyMsgList
files = os.listdir("/path/to/binary/files")
rddFiles = sc.parallelize(files, 6000)
df = spark.createDataFrame(rddFiles.flatMap(reader), schema)
df.repartition(6000).write.mode("append").partitionBy("key").parquet("/directory")
Run Code Online (Sandbox Code Playgroud)

这里选择 6000 的原因sc.parallelize(files, 6000)是创建每个大小为 200 MB 的分区,即(12k files * 100mb size) / 200MB。由于文件内容的顺序性质需要逐字节读取每个文件内容,不确定读取是否可以进一步优化?同样,当写回parquet时,输入的数字repartition(6000)是为了确保数据均匀分布并且所有执行器可以并行写入。然而,事实证明这是一个非常缓慢的操作。

一种解决方案是增加执行器的数量,这会提高读取性能,但不确定是否会提高写入性能?

正在寻找有关如何提高此处性能的任何建议?

lin*_*nog 8

建议1:不要使用repartitionbut coalesce

这里。您确定了操作的瓶颈repartition,这是因为您启动了全洗牌。有coalesce你在,就不会这么做。您最终也会得到N 个分区。它们不会像您所得到的那样平衡,repartition但这有关系吗?

我建议你赞成coalesce而不是repartition

建议2:6000个分区可能不是最佳选择

您的应用程序运行在 6 个节点、4 个核心上。您有 6000 个分区。这意味着您有大约 250 个核心分区(甚至没有计算给您的 master 的分区)。在我看来,这太过分了。

由于您的分区很小(大约 200Mb),您的 master 可能会花费更多时间等待执行器的应答,而不是执行查询。

我建议你减少分区数量

建议3:可以使用DataFrame API吗?

DataFrame API 操作通常比手动编码的解决方案更快、更好。

也许看看pyspark.sql.functions你是否能在那里找到一些东西(参见这里)。我不知道这是否相关,因为我没有看到你的数据,但这是我根据我的经验所做的一般建议。