jas*_*san 4 hadoop-yarn apache-spark pyspark spark-dataframe
我的程序流程是这样的:
1. 将 40 亿行 (~700GB) 的数据从镶木地板文件读取到数据框中。使用的分区大小为 2296
2. 清理它并过滤掉 25 亿行
3. 使用管道模型和训练模型转换剩余的 15 亿行。该模型使用逻辑回归模型进行训练,其中预测 0 或 1,并且 30% 的数据从转换后的数据框中过滤掉。
4. 上述数据框与另一个约 1 TB 的数据集(也从镶木地板文件中读取)进行左外连接。分区大小为 4000
5. 将其与另一个大约 100 MB 的数据集连接,如
joined_data = data1.join(broadcast(small_dataset_100MB), data1.field == small_dataset_100MB.field, "left_outer")
6. 然后分解上述数据框到 ~2000 的因子exploded_data = joined_data.withColumn('field', explode('field_list'))
7. 执行聚合aggregate = exploded_data.groupBy(*cols_to_select)\
.agg(F.countDistinct(exploded_data.field1).alias('distincts'), F.count("*").alias('count_all'))
cols_to_select列表中共有 10 列。
8. 最后aggregate.count()执行一个动作。
问题是,倒数第三个计数阶段(200 个任务)永远卡在任务 199 处。尽管分配了 4 个内核和 56 个执行程序,但计数仅使用一个内核和一个执行程序来运行作业。我尝试将大小从 40 亿行分解为 7 亿行,这是 1/6 的一部分,花了四个小时。我真的很感激在如何加快这个过程方面的一些帮助谢谢
由于倾斜的数据被加入到一个巨大的数据集中,操作被卡在了最后的任务上。连接两个数据帧的键严重偏斜。现在通过从数据框中删除倾斜的数据解决了这个问题。如果您必须包含倾斜的数据,您可以使用迭代广播连接 ( https://github.com/godatadriven/iterative-broadcast-join )。查看此内容丰富的视频以了解更多详细信息https://www.youtube.com/watch?v=6zg7NTw-kTQ
| 归档时间: |
|
| 查看次数: |
2106 次 |
| 最近记录: |