use*_*100 2 scala apache-spark apache-spark-sql
I have 2 large datasets.
First dataset contains about 130 million entries.
The second dataset contains about 40000 entries.
The data is fetched from MySQL tables.
I need to do a cross-join but I am getting
java.sql.SQLException: GC overhead limit exceeded
Run Code Online (Sandbox Code Playgroud)
What is the best optimum technique to do this in Scala?
Following is a snippet of my code:
val df1 = (spark.read.jdbc(jdbcURL,configurationLoader.mysql_table1,"id",100,100000,40, MySqlConnection.getConnectionProperties))
val df2 = (spark.read.jdbc(jdbcURL,configurationLoader.mysql_table2, MySqlConnection.getConnectionProperties))
val df2Cache = df2.repartition(40).cache()
val crossProduct = df1.join(df2Cache)
Run Code Online (Sandbox Code Playgroud)
df1 is the larger dataset and df2 is the smaller one.
130M*40K = 52 万亿条记录是存储这些数据所需的 52 TB 内存,如果我们假设每条记录是 1 个字节,这肯定是不正确的。如果它多达 64 个字节(我认为这也是一个非常保守的估计),那么您需要 3.32 PB (!) 的内存来存储数据。这是一个非常大的数量,因此除非您有一个非常大的集群和该集群内的非常快的网络,否则您可能需要重新考虑您的算法以使其工作。
话虽如此,当您执行join两个 SQL 数据集/数据帧中的一个时,Spark 用于存储连接结果的分区数由spark.sql.shuffle.partitions属性控制(请参阅此处)。您可能希望将其设置为一个非常大的数字,并将执行程序的数量设置为您能做到的最大数量。然后,您也许可以将处理运行到最后。
此外,您可能需要查看该spark.shuffle.minNumPartitionsToHighlyCompress选项;如果您将其设置为少于 shuffle 分区的数量,您可能会再次获得内存提升。请注意,此选项是一个硬编码常量,直到最近的 Spark 版本才设置为 2000,因此根据您的环境,您只需将其设置spark.sql.shuffle.partitions为大于 2000 的数字即可使用它。
| 归档时间: |
|
| 查看次数: |
2391 次 |
| 最近记录: |