Spark使用python:如何解析Stage x包含一个非常大的任务(xxx KB).建议的最大任务大小为100 KB

use*_*723 30 apache-spark spark-streaming

我刚刚创建了python列表range(1,100000).

使用SparkContext完成以下步骤:

a = sc.parallelize([i for i in range(1, 100000)])
b = sc.parallelize([i for i in range(1, 100000)])

c = a.zip(b)

>>> [(1, 1), (2, 2), -----]

sum  = sc.accumulator(0)

c.foreach(lambda (x, y): life.add((y-x)))
Run Code Online (Sandbox Code Playgroud)

其中发出如下警告:

ARN TaskSetManager:阶段3包含一个非常大的任务(4644 KB).建议的最大任务大小为100 KB.

如何解决此警告?有没有办法处理大小?而且,它会影响大数据的时间复杂度吗?

Hit*_*ani 9

Spark在任务发送期间本地传送了每个变量的副本.对于大尺寸的此类变量,您可能需要使用广播变量

如果您仍然面临尺寸问题,那么这些数据本身应该是RDD

编辑:更新了链接

  • 只是为了澄清这个答案.在提供的示例中超过100 KB的变量是`[i for i in range(1,100000)]` (2认同)

Jea*_*lie 8

一般的想法是PySpark创建的Java进程数比执行程序多,然后将数据发送到每个进程.如果进程太少,则Java堆空间上会出现内存瓶颈.

在你的情况,具体的错误是您使用创建的RDD sc.parallelize([...])没有指定分区的数量(参数numSlices文档).并且RDD默认为一些太小的分区(可能由单个分区构成).

要解决此问题,只需指定所需的分区数:

a = sc.parallelize([...], numSlices=1000)   # and likewise for b
Run Code Online (Sandbox Code Playgroud)

当您指定越来越多的切片时,您会看到警告消息中指定的大小减小.增加切片数量,直到您不再收到警告消息.例如,得到

Stage 0 contains a task of very large size (696 KB). The maximum recommended task size is 100 KB
Run Code Online (Sandbox Code Playgroud)

意味着您需要指定更多切片.


处理内存问题时可能有用的另一个提示(但这与警告消息无关):默认情况下,每个执行程序可用的内存为1 GB左右.您可以通过命令行指定更大的金额,例如--executor-memory 64G.

  • 如果你 `import sys, math` 那么 `n = math.ceil(sys.getsizeof(your_list) / 102400)` 将是使所有切片保持在 100KB 以下的最小切片数 (2认同)

Tim*_*sov 6

扩展@leo9r 评论:考虑使用不是 python range,而是sc.range https://spark.apache.org/docs/1.6.0/api/python/pyspark.html#pyspark.SparkContext.range

因此,您可以避免将大量列表从驱动程序传输到执行程序。

当然,此类 RDD 通常仅用于测试目的,因此您不希望它们被广播。

  • 在玩具示例中使用 `sc.range` 而不是 `range` 可以工作,但忽略了更普遍的问题(python 和 java 之间如何通信数据) (5认同)