加入一个庞大而巨大的火花数据帧

Mat*_*ewH 7 apache-spark spark-dataframe

我有两个数据帧,df1有600万行,df2有10亿.

我已经尝试过该标准df1.join(df2,df1("id")<=>df2("id2")),但内存不足.

df1太大而无法放入广播连接中.

我甚至尝试了一个布隆过滤器,但它也太大了,不适合广播,仍然有用.

我尝试过的唯一没有错误的是将df1分解为300,000行块并在foreach循环中与df2连接.但这比它可能应该的时间长一个数量级(可能因为它太大而不适合作为持久性导致它重新分裂到那一点).重新组合结果也需要一段时间.

你是怎么解决这个问题的?

几点说明:

df1是df2的子集.df1=df2.where("fin<1").selectExpr("id as id2").distinct()我对df2中的所有行感兴趣,这些行的id一次有一个fin <1,这意味着我不能一步完成它.

df2中有大约2亿个唯一ID.

这里有一些相关的火花设置:

spark.cores.max=1000
spark.executor.memory=15G
spark.akka.frameSize=1024
spark.shuffle.consolidateFiles=false
spark.task.cpus=1
spark.driver.cores=1
spark.executor.cores=1
spark.memory.fraction=0.5
spark.memory.storageFraction=0.3
spark.sql.shuffle.partitions=10000
spark.default.parallelism=10000
Run Code Online (Sandbox Code Playgroud)

我得到的错误是:

16/03/11 04:36:07 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerTaskEnd(11,1,ResultTask,FetchFailed(BlockManagerId(68dcb91c-1b45-437d-ac47-8e8c1e4bc386-S199, mapr, 46487),3,176,4750,org.apache.spark.shuffle.FetchFailedException: java.io.FileNotFoundException: /tmp/mesos/work/slaves/68dcb91c-1b45-437d-ac47-8e8c1e4bc386-S199/frameworks/c754216b-bf80-4d84-97f1-2e907030365e-2545/executors/16/runs/5a5a01c5-205e-4380-94d3-7fa0f6421b85/blockmgr-ea345692-05bb-4f42-9ba1-7b93311fb9d4/0e/shuffle_3_340_0.index (No such file or directory)
Run Code Online (Sandbox Code Playgroud)

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 465 in stage 6.3 failed 4 times, most recent failure: Lost task 465.3 in stage 6.3 (TID 114448, mapr): java.lang.OutOfMemoryError: Direct buffer memory
Run Code Online (Sandbox Code Playgroud)

Igo*_*man 6

正如我所看到的,您有一个分区太大的问题(可能是由于更大的数据),您可以尝试以下几种方法:

  1. 尝试将spark.sql.shuffle.partitions定义为2048或更高(默认值为200)。加入df-s时会出现随机播放。尝试使用此参数,以便更大数据/此参数的总容量约为64Mb-100Mb(取决于文件格式)。通常,您应该在Spark UI中看到每个任务(每个分区)处理“正常”数据量(最大64MB-100MB)

  2. 如果first不起作用,我建议在RDD api中加入。将您的df转换为RDD。然后通过HashPartitioner(分区数)对两个RDD进行分区。什么时候应该像我之前描述的那样计算分区数。

  3. 最近,spark开发人员添加了新的选项:您可以将Ningormous表存储到N个存储桶中(即,将其存储以备联接使用)。目前几乎没有限制,但它可以完全消除混排巨大数据。只有saveAsTable API支持bucketBy,而不保存其中一个。在对数据进行存储并进行存储之后,在下一次迭代中,您可以在提供存储规范的同时将此数据作为外部表加载(请参阅https://docs.databricks.com/spark/latest/spark-sql/language-manual/ create-table.html

    创建巨大的表--...在这里您必须指定模式(使用(a,b,c)聚簇到N个存储桶中)'hdfs:// your-path'

然后,当您将大型表作为桶装表加载时,可以加载大表并将其重新分区到相同数量的桶和相同的列(df.repartition(N,a,b,c))