gsa*_*ras 6 python hadoop distributed-computing apache-spark pyspark
我正在阅读许多图像,我想研究其中的一小部分以进行显影。结果,我试图了解spark和python如何实现这一目标:
In [1]: d = sqlContext.read.parquet('foo')
In [2]: d.map(lambda x: x.photo_id).first()
Out[2]: u'28605'
In [3]: d.limit(1).map(lambda x: x.photo_id)
Out[3]: PythonRDD[31] at RDD at PythonRDD.scala:43
In [4]: d.limit(1).map(lambda x: x.photo_id).first()
// still running...
Run Code Online (Sandbox Code Playgroud)
..那么怎么回事?我希望limit()的运行速度比我们的运行速度快得多[2],但事实并非如此*。
下面,我将描述我的理解,并请纠正我,因为显然我缺少了一些东西:
d 是成对的RDD(我从模式中知道),我是在用map函数说的:
i)取每一对(将被命名x并给我photo_id属性)。
ii)这将导致一个新的(匿名)RDD,我们将在其中应用该first()方法,但我不确定该方法的工作方式$,但应该给我该匿名RDD的第一个元素。
在中[3],我们将dRDD 限制为1,这意味着尽管d元素很多,但仅使用1并将map函数仅应用于该元素。本Out [3]应该是由映射创建的RDD。
[4],我希望遵循的逻辑[3]并只打印有限的RDD的唯一元素...不出所料,在查看监视器后,[4]似乎处理了整个数据集,而其他数据集则没有,因此看来我使用的不limit()正确,或者这不是我要寻找的内容:
编辑:
tiny_d = d.limit(1).map(lambda x: x.photo_id)
tiny_d.map(lambda x: x.photo_id).first()
Run Code Online (Sandbox Code Playgroud)
第一次将给予PipelinedRDD,其作为描述在这里,它实际上并不会做任何动作,只是一个转型。
但是,第二行也将处理整个数据集(事实上,现在的任务数与以前一样多,再加上一个!)。
* [2]立即执行,而[4]仍在运行且> 3小时过去。
$ 由于名称,我在文档中找不到它。
小智 5
根据您的代码,这是Spark 2.0上更简单的测试用例
case class my (x: Int)
val rdd = sc.parallelize(0.until(10000), 1000).map { x => my(x) }
val df1 = spark.createDataFrame(rdd)
val df2 = df1.limit(1)
df1.map { r => r.getAs[Int](0) }.first
df2.map { r => r.getAs[Int](0) }.first // Much slower than the previous line
Run Code Online (Sandbox Code Playgroud)
实际上,Dataset.first等效于Dataset.limit(1).collect,因此请检查以下两种情况的物理计划:
scala> df1.map { r => r.getAs[Int](0) }.limit(1).explain
== Physical Plan ==
CollectLimit 1
+- *SerializeFromObject [input[0, int, true] AS value#124]
+- *MapElements <function1>, obj#123: int
+- *DeserializeToObject createexternalrow(x#74, StructField(x,IntegerType,false)), obj#122: org.apache.spark.sql.Row
+- Scan ExistingRDD[x#74]
scala> df2.map { r => r.getAs[Int](0) }.limit(1).explain
== Physical Plan ==
CollectLimit 1
+- *SerializeFromObject [input[0, int, true] AS value#131]
+- *MapElements <function1>, obj#130: int
+- *DeserializeToObject createexternalrow(x#74, StructField(x,IntegerType,false)), obj#129: org.apache.spark.sql.Row
+- *GlobalLimit 1
+- Exchange SinglePartition
+- *LocalLimit 1
+- Scan ExistingRDD[x#74]
Run Code Online (Sandbox Code Playgroud)
对于第一种情况,它与CollectLimitExec物理运算符中的优化有关。也就是说,它将首先获取第一个分区以获取行的限制数量,在这种情况下,如果不满足,则为1,然后获取更多分区,直到达到所需的限制。因此,通常,如果第一个分区不为空,则只会计算和获取第一个分区。其他分区甚至不会被计算。
但是,在第二种情况下,CollectLimitExec中的优化无济于事,因为先前的限制操作涉及混洗操作。将计算所有分区,并在每个分区上运行LocalLimit(1)以获得1行,然后将所有分区改组为一个分区。CollectLimitExec将从生成的单个分区中获取1行。