Rol*_*ndo 5 scala avro apache-spark
我有一个变量"myrdd",它是一个avro文件,通过hadoopfile加载了10条记录.
当我做
myrdd.first_1.datum.getName()
Run Code Online (Sandbox Code Playgroud)
我可以得到这个名字.问题是,我在"myrdd"中有10条记录.当我做:
myrdd.map(x => {println(x._1.datum.getName())})
Run Code Online (Sandbox Code Playgroud)
它不起作用并且一次打印出一个奇怪的物体.如何迭代所有记录?
Ber*_*ium 14
以下是使用spark-shell类似方案的会话日志.
特定
scala> persons
res8: org.apache.spark.sql.DataFrame = [name: string, age: int]
scala> persons.first
res7: org.apache.spark.sql.Row = [Justin,19]
Run Code Online (Sandbox Code Playgroud)
你的问题看起来像
scala> persons.map(t => println(t))
res4: org.apache.spark.rdd.RDD[Unit] = MapPartitionsRDD[10]
Run Code Online (Sandbox Code Playgroud)
所以map只返回另一个RDD(该函数不立即应用,当你真正迭代结果时,该函数被"懒惰地"应用).
因此,当您实现(使用collect())时,您将获得"正常"集合:
scala> persons.collect()
res11: Array[org.apache.spark.sql.Row] = Array([Justin,19])
Run Code Online (Sandbox Code Playgroud)
在哪你可以map.请注意,在这种情况下,您在传递给map(the println)的闭包中有副作用,结果println为Unit:):
scala> persons.collect().map(t => println(t))
[Justin,19]
res5: Array[Unit] = Array(())
Run Code Online (Sandbox Code Playgroud)
如果collect在最后应用,结果相同:
scala> persons.map(t => println(t)).collect()
[Justin,19]
res19: Array[Unit] = Array(())
Run Code Online (Sandbox Code Playgroud)
但是,如果您只想打印行,可以将其简化为使用foreach:
scala> persons.foreach(t => println(t))
[Justin,19]
Run Code Online (Sandbox Code Playgroud)
正如@RohanAletty在评论中指出的那样,这适用于本地Spark作业.如果作业在群集中运行,collect则还需要:
persons.collect().foreach(t => println(t))
Run Code Online (Sandbox Code Playgroud)
笔记
Iterator课堂上可以观察到相同的行为.更新
至于过滤:位置collect是"坏",如果你collect应用之前可以应用的过滤器.
例如,这些表达式给出相同的结果:
scala> persons.filter("age > 20").collect().foreach(println)
[Michael,29]
[Andy,30]
scala> persons.collect().filter(r => r.getInt(1) >= 20).foreach(println)
[Michael,29]
[Andy,30]
Run Code Online (Sandbox Code Playgroud)
但第二种情况更糟,因为之前可能已应用过滤器collect.
这同样适用于任何类型的聚合.
| 归档时间: |
|
| 查看次数: |
35395 次 |
| 最近记录: |