相关疑难解决方法(0)

活动任务是Spark UI中的负数

当使用,我看到了这个:

在此输入图像描述

您可以看到活动任务是否为负数(总任务与已完成任务的差异).

这个错误的来源是什么?


节点我有很多执行者.但是,似乎有一项任务似乎已经空闲(我没有看到任何进展),而另一项相同的任务正常完成.


这也是相关的:邮件我可以确认正在创建许多任务,因为我使用的是1k或2k执行程序.

我得到的错误有点不同:

16/08/15 20:03:38 ERROR LiveListenerBus: Dropping SparkListenerEvent because no remaining room in event queue. This likely means one of the SparkListeners is too slow and cannot keep up with the rate at which tasks are being started by the scheduler.
16/08/15 20:07:18 WARN TaskSetManager: Lost task 20652.0 in stage 4.0 (TID 116652, myfoo.com): FetchFailed(BlockManagerId(61, mybar.com, 7337), shuffleId=0, mapId=328, reduceId=20652, message=
org.apache.spark.shuffle.FetchFailedException: java.util.concurrent.TimeoutException: Timeout waiting for …
Run Code Online (Sandbox Code Playgroud)

python hadoop distributed-computing bigdata apache-spark

21
推荐指数
2
解决办法
2361
查看次数

火花可扩展性:我做错了什么?

我正在使用spark处理数据,它可以处理一天的数据(40G),但是在一周的数据中使用OOM会失败:

import pyspark
import datetime
import operator
sc = pyspark.SparkContext()
sqc = pyspark.sql.SQLContext(sc)
sc.union([sqc.parquetFile(hour.strftime('.....'))
          .map(lambda row:(row.id, row.foo))
          for hour in myrange(beg,end,datetime.timedelta(0,3600))]) \
  .reduceByKey(operator.add).saveAsTextFile("myoutput")
Run Code Online (Sandbox Code Playgroud)

不同ID的数量小于10k.每个ID都很小int.由于太多执行程序因OOM而失败,因此作业失败.当工作成功时(在小输入上),"myoutput"大约是100k.

  1. 我究竟做错了什么?
  2. 我试着更换saveAsTextFilecollect(因为我实际上希望做一些切片和保存之前在python切割),但在行为上是没有任何区别,同样的失败.这是预期的吗?
  3. 我曾经有过reduce(lambda x,y: x.union(y), [sqc.parquetFile(...)...])而不是sc.union- 哪个更好?它有什么不同吗?

该集群有25个节点,内存为825GB,其中224个内核.

调用是spark-submit --master yarn --num-executors 50 --executor-memory 5G.

单个RDD有大约140列并且覆盖一小时的数据,因此一周是168(= 7*24)个RDD的并集.

scalability distributed-computing bigdata apache-spark pyspark

6
推荐指数
1
解决办法
967
查看次数

在 DataFrame 联合后管理 Spark 分区

我有一个 Spark 应用程序,需要大量使用它unions,我将在不同时间、不同情况下将大量 DataFrame 合并在一起。我正在努力使这项工作尽可能高效地进行。我对 Spark 还很陌生,我突然想到了一件事:

如果我有dfA具有 X 个分区 ( ) 的 DataFrame 'A' ( ) numAPartitions,并将其联合到dfB具有 Y 个分区 ( ) 的DataFrame 'B' ( numBPartitions) ,那么生成的联合 DataFrame ( unionedDF) 会是什么样子,结果如何分区?

// How many partitions will unionedDF have?
// X * Y ?
// Something else?
val unionedDF : DataFrame = dfA.unionAll(dfB)
Run Code Online (Sandbox Code Playgroud)

对我来说,理解这一点似乎非常重要,因为 Spark 性能似乎严重依赖于 DataFrames 采用的分区策略。因此,如果我左右合并 DataFrame,我需要确保不断管理合并后的 DataFrame 的分区。

我唯一想到的(以便正确管理联合数据帧的分区)是对它们重新分区,然后在联合它们后立即将数据帧持久保存到内存/磁盘:

val unionedDF : DataFrame = dfA.unionAll(dfB) …
Run Code Online (Sandbox Code Playgroud)

partitioning distributed-computing unions apache-spark apache-spark-sql

5
推荐指数
1
解决办法
1万
查看次数