我在HDFS中有成千上万的小文件.需要处理稍小的文件子集(也是数千个),fileList包含需要处理的文件路径列表.
// fileList == list of filepaths in HDFS
var masterRDD: org.apache.spark.rdd.RDD[(String, String)] = sparkContext.emptyRDD
for (i <- 0 to fileList.size() - 1) {
val filePath = fileStatus.get(i)
val fileRDD = sparkContext.textFile(filePath)
val sampleRDD = fileRDD.filter(line => line.startsWith("#####")).map(line => (filePath, line))
masterRDD = masterRDD.union(sampleRDD)
}
masterRDD.first()
Run Code Online (Sandbox Code Playgroud)
//一旦退出循环,执行任何操作都会导致由于RDD的长谱系导致的堆栈溢出错误
Exception in thread "main" java.lang.StackOverflowError
at scala.runtime.AbstractFunction1.<init>(AbstractFunction1.scala:12)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.<init>(UnionRDD.scala:66)
at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) …Run Code Online (Sandbox Code Playgroud) 我有两个数据帧,df1有600万行,df2有10亿.
我已经尝试过该标准df1.join(df2,df1("id")<=>df2("id2")),但内存不足.
df1太大而无法放入广播连接中.
我甚至尝试了一个布隆过滤器,但它也太大了,不适合广播,仍然有用.
我尝试过的唯一没有错误的是将df1分解为300,000行块并在foreach循环中与df2连接.但这比它可能应该的时间长一个数量级(可能因为它太大而不适合作为持久性导致它重新分裂到那一点).重新组合结果也需要一段时间.
你是怎么解决这个问题的?
几点说明:
df1是df2的子集.df1=df2.where("fin<1").selectExpr("id as id2").distinct()我对df2中的所有行感兴趣,这些行的id一次有一个fin <1,这意味着我不能一步完成它.
df2中有大约2亿个唯一ID.
这里有一些相关的火花设置:
spark.cores.max=1000
spark.executor.memory=15G
spark.akka.frameSize=1024
spark.shuffle.consolidateFiles=false
spark.task.cpus=1
spark.driver.cores=1
spark.executor.cores=1
spark.memory.fraction=0.5
spark.memory.storageFraction=0.3
spark.sql.shuffle.partitions=10000
spark.default.parallelism=10000
Run Code Online (Sandbox Code Playgroud)
我得到的错误是:
16/03/11 04:36:07 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerTaskEnd(11,1,ResultTask,FetchFailed(BlockManagerId(68dcb91c-1b45-437d-ac47-8e8c1e4bc386-S199, mapr, 46487),3,176,4750,org.apache.spark.shuffle.FetchFailedException: java.io.FileNotFoundException: /tmp/mesos/work/slaves/68dcb91c-1b45-437d-ac47-8e8c1e4bc386-S199/frameworks/c754216b-bf80-4d84-97f1-2e907030365e-2545/executors/16/runs/5a5a01c5-205e-4380-94d3-7fa0f6421b85/blockmgr-ea345692-05bb-4f42-9ba1-7b93311fb9d4/0e/shuffle_3_340_0.index (No such file or directory)
Run Code Online (Sandbox Code Playgroud)
和
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 465 in stage 6.3 failed 4 times, most recent failure: Lost task 465.3 in stage 6.3 (TID 114448, mapr): java.lang.OutOfMemoryError: …Run Code Online (Sandbox Code Playgroud) 我已经阅读了很多有关如何在pyspark中进行有效联接的内容。我发现实现高效联接的方法基本上是:
最后一个是我想尝试的,但是我找不到在pyspark中实现它的方法。我试过了:
df.repartition(numberOfPartitions,['parition_col1','partition_col2'])
Run Code Online (Sandbox Code Playgroud)
但这无济于事,直到我停止它仍需要花费很长时间,因为在最后的几项工作中卡住了火花。
因此,如何在pyspark中使用相同的分区程序并加快连接速度,甚至摆脱永远需要的时间?我需要使用哪个代码?
PD:即使在stackoverflow上,我也查看了其他文章,但是我仍然看不到代码。