什么是在Spark SQL中连接大表的优化方法

S. *_*. K 10 apache-spark apache-spark-sql

我需要使用Spark SQL或Dataframe API连接表.需要知道实现它的优化方式.

场景是:

  1. 所有数据都以ORC格式存在于Hive中(基本数据帧和参考文件).
  2. 我需要加入一个从Hive读取的基本文件(Dataframe)和11-13其他参考文件来创建一个大的内存结构(400列)(大小约1 TB)

实现这一目标的最佳方法是什么?如果有人遇到类似的问题,请分享您的经验.

Sim*_*Sim 11

关于如何优化连接的默认建议是:

  1. 如果可以,请使用广播联接(请参阅此笔记本).从您的问题来看,您的表似乎很大,并且广播连接不是一个选项.

  2. 考虑使用一个非常大的集群(你可能认为它更便宜).现在250美元(6/2016)在EC2现货实例市场上购买了大约24小时800核,6Tb RAM和许多SSD.在考虑大数据解决方案的总成本时,我发现人类往往会大大低估他们的时间.

  3. 使用相同的分区程序.有关共同分组连接的信息,请参阅此问题.

  4. 如果数据量很大和/或您的集群无法增长,甚至上面的(3)导致OOM,请使用两遍方法.首先,重新分区数据并使用分区表(dataframe.write.partitionBy())继续.然后,在循环中串行连接子分区,"追加"到同一个最终结果表.

旁注:上面说"追加",因为在制作中我从不使用SaveMode.Append.它不是幂等的,而且是危险的.我SaveMode.Overwrite深入使用分区表树结构的子树.在2.0.0和1.6.2之前,您必须删除_SUCCESS或元数据文件,否则动态分区发现将会阻塞.

希望这可以帮助.

  • @vikrantrana我没有带宽来做这个但基本的想法很简单:一个大的USING连接可以被分成连接列空间的子集的较小连接的并集.您可以通过对连接的左侧和右侧应用一致的分区来构建子集.例如,如果要加入整数ID,则可以按ID对模块进行分区,例如,`df.withColumn("par_id",id%256).repartition(256,'pa​​r_id).write.partitionBy( "par_id")...`然后迭代`persisted.select('par_id).distinct.collect`加入每个分区+再次持久化.然后结合. (2认同)

Bor*_*ris 10

Spark 使用SortMerge 连接来连接大表。它包括对两个表上的每一行进行散列,并将具有相同散列的行混入同一分区。在那里,键在两侧进行排序,并应用 sortMerge 算法。据我所知,这是最好的方法。

要大幅加快排序合并速度,请将大型数据集写入带有预存储和预排序选项(相同分区数量)的 Hive 表,而不是平面 Parquet 数据集。

tableA
  .repartition(2200, $"A", $"B")
  .write
  .bucketBy(2200, "A", "B")
  .sortBy("A", "B")   
  .mode("overwrite")
  .format("parquet")
  .saveAsTable("my_db.table_a")


tableb
  .repartition(2200, $"A", $"B")
  .write
  .bucketBy(2200, "A", "B")
  .sortBy("A", "B")    
  .mode("overwrite")
  .format("parquet")
  .saveAsTable("my_db.table_b")
Run Code Online (Sandbox Code Playgroud)

与好处相比,编写预存储/预排序表的开销成本是适中的。

默认情况下,底层数据集仍将是 parquet,但 Hive 元存储(可以是 AWS 上的 Glue 元存储)将包含有关表结构的宝贵信息。由于所有可能的“可连接”行都位于同一位置,Spark 不会对预存储的表进行混洗(节省大量资金!),并且不会对预排序表分区内的行进行排序。

val joined = tableA.join(tableB, Seq("A", "B"))
Run Code Online (Sandbox Code Playgroud)

查看有和没有预存储的执行计划。

这不仅会在连接过程中为您节省大量时间,而且还可以在相对较小的集群上运行非常大的连接,而不会出现 OOM。在亚马逊,我们大多数时候在产品中使用它(仍然有少数情况不需要它)。

要了解有关预装桶/预分类的更多信息:


小智 4

使用哈希分区或范围分区对源进行分区,或者如果您更了解连接字段,则可以编写自定义分区。分区将有助于避免在连接期间重新分区,因为跨表的同一分区的 Spark 数据将存在于同一位置。ORC 肯定会帮助这个事业。如果这仍然导致溢出,请尝试使用比磁盘更快的快子