Cor*_*rey 5 python distributed hadoop-yarn apache-spark pyspark
这是我能想到的最简单的DataFrame.我正在使用PySpark 1.6.1.
# one row of data
rows = [ (1, 2) ]
cols = [ "a", "b" ]
df = sqlContext.createDataFrame(rows, cols)
Run Code Online (Sandbox Code Playgroud)
所以数据框完全适合内存,没有对任何文件的引用,对我来说看起来很微不足道.
然而,当我收集数据时,它使用2000个执行程序:
df.collect()
Run Code Online (Sandbox Code Playgroud)
在收集期间,使用2000执行者:
[Stage 2:===================================================>(1985 + 15) / 2000]
Run Code Online (Sandbox Code Playgroud)
然后是预期的输出:
[Row(a=1, b=2)]
Run Code Online (Sandbox Code Playgroud)
为什么会这样?DataFrame不应该完全在驱动程序的内存中吗?
所以我稍微研究了一下代码,试图弄清楚发生了什么。看起来确实sqlContext.createDataFrame没有做出任何类型的尝试来根据数据设置合理的参数值。
为什么有 2000 个任务?
Spark 使用 2000 个任务,因为我的数据框有 2000 个分区。(尽管分区多于行似乎显然是无稽之谈。)
这可以通过以下方式看出:
>>> df.rdd.getNumPartitions()
2000
Run Code Online (Sandbox Code Playgroud)
为什么 DataFrame 有 2000 个分区?
发生这种情况是因为sqlContext.createDataFrame最终使用默认数量的分区(在我的例子中为 2000),无论数据如何组织或有多少行。
代码轨迹如下。
在 中sql/context.py,sqlContext.createDataFrame函数调用(在本例中):
rdd, schema = self._createFromLocal(data, schema)
Run Code Online (Sandbox Code Playgroud)
这又调用:
return self._sc.parallelize(data), schema
Run Code Online (Sandbox Code Playgroud)
该sqlContext.parallelize函数定义在context.py:
numSlices = int(numSlices) if numSlices is not None else self.defaultParallelism
Run Code Online (Sandbox Code Playgroud)
不对行数进行检查,并且无法指定 中的切片数sqlContext.createDataFrame。
如何更改 DataFrame 的分区数量?
使用DataFrame.coalesce。
>>> smdf = df.coalesce(1)
>>> smdf.rdd.getNumPartitions()
1
>>> smdf.explain()
== Physical Plan ==
Coalesce 1
+- Scan ExistingRDD[a#0L,b#1L]
>>> smdf.collect()
[Row(a=1, b=2)]
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
376 次 |
| 最近记录: |