走向限制大RDD

gsa*_*ras 6 python hadoop distributed-computing apache-spark pyspark

我正在阅读许多图像,我想研究其中的一小部分以进行显影。结果,我试图了解如何实现这一目标:

In [1]: d = sqlContext.read.parquet('foo')
In [2]: d.map(lambda x: x.photo_id).first()
Out[2]: u'28605'

In [3]: d.limit(1).map(lambda x: x.photo_id)
Out[3]: PythonRDD[31] at RDD at PythonRDD.scala:43

In [4]: d.limit(1).map(lambda x: x.photo_id).first()
// still running...
Run Code Online (Sandbox Code Playgroud)

..那么怎么回事?我希望limit()的运行速度比我们的运行速度快得多[2],但事实并非如此*

下面,我将描述我的理解,并请纠正我,因为显然我缺少了一些东西:

  1. d 是成对的RDD(我从模式中知道),我是在用map函数说的:

    i)取每一对(将被命名x并给我photo_id属性)。

    ii)这将导致一个新的(匿名)RDD,我们将在其中应用该first()方法,但我不确定该方法的工作方式$,但应该给我该匿名RDD的第一个元素。

  2. 在中[3],我们将dRDD 限制为1,这意味着尽管d元素很多,但仅使用1并将map函数仅应用于该元素。本Out [3]应该是由映射创建的RDD。

  3. 在中[4],我希望遵循的逻辑[3]并只打印有限的RDD的唯一元素...

不出所料,在查看监视器后,[4]似乎处理了整个数据集,而其他数据集则没有,因此看来我使用的不limit()正确,或者这不是我要寻找的内容:

在此处输入图片说明


编辑:

tiny_d = d.limit(1).map(lambda x: x.photo_id)
tiny_d.map(lambda x: x.photo_id).first()
Run Code Online (Sandbox Code Playgroud)

第一次将给予PipelinedRDD,其作为描述在这里,它实际上并不会做任何动作,只是一个转型。

但是,第二行也将处理整个数据集(事实上,现在的任务数与以前一样多,再加上一个!)。


* [2]立即执行,而[4]仍在运行且> 3小时过去。

$ 由于名称,我在文档中找不到它。

小智 5

根据您的代码,这是Spark 2.0上更简单的测试用例

case class my (x: Int)
val rdd = sc.parallelize(0.until(10000), 1000).map { x => my(x) }
val df1 = spark.createDataFrame(rdd)
val df2 = df1.limit(1)
df1.map { r => r.getAs[Int](0) }.first
df2.map { r => r.getAs[Int](0) }.first // Much slower than the previous line
Run Code Online (Sandbox Code Playgroud)

实际上,Dataset.first等效于Dataset.limit(1).collect,因此请检查以下两种情况的物理计划:

scala> df1.map { r => r.getAs[Int](0) }.limit(1).explain
== Physical Plan ==
CollectLimit 1
+- *SerializeFromObject [input[0, int, true] AS value#124]
   +- *MapElements <function1>, obj#123: int
      +- *DeserializeToObject createexternalrow(x#74, StructField(x,IntegerType,false)), obj#122: org.apache.spark.sql.Row
         +- Scan ExistingRDD[x#74]

scala> df2.map { r => r.getAs[Int](0) }.limit(1).explain
== Physical Plan ==
CollectLimit 1
+- *SerializeFromObject [input[0, int, true] AS value#131]
   +- *MapElements <function1>, obj#130: int
      +- *DeserializeToObject createexternalrow(x#74, StructField(x,IntegerType,false)), obj#129: org.apache.spark.sql.Row
         +- *GlobalLimit 1
            +- Exchange SinglePartition
               +- *LocalLimit 1
                  +- Scan ExistingRDD[x#74]
Run Code Online (Sandbox Code Playgroud)

对于第一种情况,它与CollectLimitExec物理运算符中的优化有关。也就是说,它将首先获取第一个分区以获取行的限制数量,在这种情况下,如果不满足,则为1,然后获取更多分区,直到达到所需的限制。因此,通常,如果第一个分区不为空,则只会计算和获取第一个分区。其他分区甚至不会被计算。

但是,在第二种情况下,CollectLimitExec中的优化无济于事,因为先前的限制操作涉及混洗操作。将计算所有分区,并在每个分区上运行LocalLimit(1)以获得1行,然后将所有分区改组为一个分区。CollectLimitExec将从生成的单个分区中获取1行。