S. *_*. K 10 apache-spark apache-spark-sql
我需要使用Spark SQL或Dataframe API连接表.需要知道实现它的优化方式.
场景是:
实现这一目标的最佳方法是什么?如果有人遇到类似的问题,请分享您的经验.
Sim*_*Sim 11
关于如何优化连接的默认建议是:
如果可以,请使用广播联接(请参阅此笔记本).从您的问题来看,您的表似乎很大,并且广播连接不是一个选项.
考虑使用一个非常大的集群(你可能认为它更便宜).现在250美元(6/2016)在EC2现货实例市场上购买了大约24小时800核,6Tb RAM和许多SSD.在考虑大数据解决方案的总成本时,我发现人类往往会大大低估他们的时间.
使用相同的分区程序.有关共同分组连接的信息,请参阅此问题.
如果数据量很大和/或您的集群无法增长,甚至上面的(3)导致OOM,请使用两遍方法.首先,重新分区数据并使用分区表(dataframe.write.partitionBy())继续.然后,在循环中串行连接子分区,"追加"到同一个最终结果表.
旁注:上面说"追加",因为在制作中我从不使用SaveMode.Append.它不是幂等的,而且是危险的.我SaveMode.Overwrite深入使用分区表树结构的子树.在2.0.0和1.6.2之前,您必须删除_SUCCESS或元数据文件,否则动态分区发现将会阻塞.
希望这可以帮助.
Bor*_*ris 10
Spark 使用SortMerge 连接来连接大表。它包括对两个表上的每一行进行散列,并将具有相同散列的行混入同一分区。在那里,键在两侧进行排序,并应用 sortMerge 算法。据我所知,这是最好的方法。
要大幅加快排序合并速度,请将大型数据集写入带有预存储和预排序选项(相同分区数量)的 Hive 表,而不是平面 Parquet 数据集。
tableA
.repartition(2200, $"A", $"B")
.write
.bucketBy(2200, "A", "B")
.sortBy("A", "B")
.mode("overwrite")
.format("parquet")
.saveAsTable("my_db.table_a")
tableb
.repartition(2200, $"A", $"B")
.write
.bucketBy(2200, "A", "B")
.sortBy("A", "B")
.mode("overwrite")
.format("parquet")
.saveAsTable("my_db.table_b")
Run Code Online (Sandbox Code Playgroud)
与好处相比,编写预存储/预排序表的开销成本是适中的。
默认情况下,底层数据集仍将是 parquet,但 Hive 元存储(可以是 AWS 上的 Glue 元存储)将包含有关表结构的宝贵信息。由于所有可能的“可连接”行都位于同一位置,Spark 不会对预存储的表进行混洗(节省大量资金!),并且不会对预排序表分区内的行进行排序。
val joined = tableA.join(tableB, Seq("A", "B"))
Run Code Online (Sandbox Code Playgroud)
查看有和没有预存储的执行计划。
这不仅会在连接过程中为您节省大量时间,而且还可以在相对较小的集群上运行非常大的连接,而不会出现 OOM。在亚马逊,我们大多数时候在产品中使用它(仍然有少数情况不需要它)。
要了解有关预装桶/预分类的更多信息:
小智 4
使用哈希分区或范围分区对源进行分区,或者如果您更了解连接字段,则可以编写自定义分区。分区将有助于避免在连接期间重新分区,因为跨表的同一分区的 Spark 数据将存在于同一位置。ORC 肯定会帮助这个事业。如果这仍然导致溢出,请尝试使用比磁盘更快的快子
| 归档时间: |
|
| 查看次数: |
14327 次 |
| 最近记录: |