mon*_*ney 5 dataset apache-spark apache-spark-2.0
我有使用rdd的要求吗?
val test = Seq(("New York", "Jack"),
("Los Angeles", "Tom"),
("Chicago", "David"),
("Houston", "John"),
("Detroit", "Michael"),
("Chicago", "Andrew"),
("Detroit", "Peter"),
("Detroit", "George")
)
sc.parallelize(test).groupByKey().mapValues(_.toList).foreach(println)
Run Code Online (Sandbox Code Playgroud)
结果是?
(纽约,清单(杰克))
(底特律清单(Michael,Peter,George))
(洛杉矶,名单(汤姆))
(休斯顿,李斯特(约翰))
(芝加哥,名单(戴维,安德鲁))
如何在spark2.0中使用数据集?
我有使用自定义函数的方法,但是感觉是如此复杂,有没有简单的指向方法?
我建议您先创建一个case classas
case class Monkey(city: String, firstName: String)
Run Code Online (Sandbox Code Playgroud)
这case class应该在主类之外定义。然后,您可以使用toDSfunction,并按如下所示使用groupBy和aggregationfunctioncollect_list
import sqlContext.implicits._
import org.apache.spark.sql.functions._
val test = Seq(("New York", "Jack"),
("Los Angeles", "Tom"),
("Chicago", "David"),
("Houston", "John"),
("Detroit", "Michael"),
("Chicago", "Andrew"),
("Detroit", "Peter"),
("Detroit", "George")
)
sc.parallelize(test)
.map(row => Monkey(row._1, row._2))
.toDS()
.groupBy("city")
.agg(collect_list("firstName") as "list")
.show(false)
Run Code Online (Sandbox Code Playgroud)
您将输出为
+-----------+------------------------+
|city |list |
+-----------+------------------------+
|Los Angeles|[Tom] |
|Detroit |[Michael, Peter, George]|
|Chicago |[David, Andrew] |
|Houston |[John] |
|New York |[Jack] |
+-----------+------------------------+
Run Code Online (Sandbox Code Playgroud)
您总是可以RDD通过调用.rdd函数来转换回
| 归档时间: |
|
| 查看次数: |
15064 次 |
| 最近记录: |