fly*_*all 6 python apache-spark pyspark
我正在 pyspark 2.3 中工作,我正在尝试找出从数据框中获取一些聚合统计信息的最有效方法。
我有一个包含 15 亿条记录的数据框,分布在一个由 10 个节点组成的相对较小的集群中。每个都有 16GB 内存和 4 个核心。我的复制因子设置为 2。
我的数据框可能有 15 列,它们是数据类型的混合,但我只对两列感兴趣 - ID 和 eventDate。我想运行的代码非常简单:
output = df.groupby(['ID']).agg(F.min('eventDate').alias("firstDate"),F.max('eventDate').alias("lastDate"))
output.write.parquet('hdfs:///somewhere/dateFile.parquet',mode='overwrite')
Run Code Online (Sandbox Code Playgroud)
我试图找出执行此操作的最有效方法。ID(我要分组的字段)有 12m 个值,df.rdd.getNumPartitions() 目前为 642。
我最好先将数据框投影到我想要的两列吗?有这么多 ID,我应该先重新分区我的数据集吗?我应该删除重复项吗?我可以在我的 groupby 之前运行这样的事情:
df = df[['ID','eventDate']].drop_duplicates().repartition(x)
Run Code Online (Sandbox Code Playgroud)
或者
df = df[['ID','eventDate']].repartition(x)
Run Code Online (Sandbox Code Playgroud)
我正在努力找出什么可以优化运行时。任何有关预先确定运行时间的指导将不胜感激。如果可能的话,我不想只是“测试一下”,因为我有几个这样的查询要运行,每个查询都需要一段时间。
这可能不是您正在寻找的答案,但此操作的最佳代码正是
output = df.groupby(['ID']). \
agg(F.min('eventDate').alias("firstDate"), F.max('eventDate').alias("lastDate"))
output.write.parquet('hdfs:///somewhere/dateFile.parquet', mode='overwrite')
Run Code Online (Sandbox Code Playgroud)
Spark 通过首先选择整个操作所需的必要列来优化流程。然后,Spark 对数据进行分区,ID并在每个分区上启动聚合过程。
允许最大数量的执行者肯定会有帮助。我建议(根据你的描述)设置spark.executor.instances=10; spark.executor.memory=10g。12m 值是一个相当大的数量,也许可以尝试增加随机分区的数量 fe spark.sql.shuffle.partitions=400,这样您就不会遇到一些恼人的内存开销异常。
| 归档时间: |
|
| 查看次数: |
10564 次 |
| 最近记录: |