我是Scala和Spark的新手,所以希望有人能告诉我这里我出错的地方.
我有一个三列数据集(id,name,year),我想找到每个名字的最近一年.换一种说法:
BEFORE AFTER
| id_1 | name_1 | 2015 | | id_2 | name_1 | 2016 |
| id_2 | name_1 | 2016 | | id_4 | name_2 | 2015 |
| id_3 | name_1 | 2014 |
| id_4 | name_2 | 2015 |
| id_5 | name_2 | 2014 |
Run Code Online (Sandbox Code Playgroud)
我想groupByKey并且reduceGroups会完成工作:
val latestYears = ds
.groupByKey(_.name)
.reduceGroups((left, right) => if (left.year > right.year) left else right)
.map(group => group._2)
Run Code Online (Sandbox Code Playgroud)
但它给出了这个错误,并吐出了很多生成的Java代码:
ERROR CodeGenerator: failed …Run Code Online (Sandbox Code Playgroud) 我正在尝试将一些常见代码提取到抽象类中,但遇到了问题.
假设我正在读取格式为"id | name"的文件:
case class Person(id: Int, name: String) extends Serializable
object Persons {
def apply(lines: Dataset[String]): Dataset[Person] = {
import lines.sparkSession.implicits._
lines.map(line => {
val fields = line.split("\\|")
Person(fields(0).toInt, fields(1))
})
}
}
Persons(spark.read.textFile("persons.txt")).show()
Run Code Online (Sandbox Code Playgroud)
大.这很好用.现在让我们说我想用"名称"字段读取许多不同的文件,因此我将提取出所有常见的逻辑:
trait Named extends Serializable { val name: String }
abstract class NamedDataset[T <: Named] {
def createRecord(fields: Array[String]): T
def apply(lines: Dataset[String]): Dataset[T] = {
import lines.sparkSession.implicits._
lines.map(line => createRecord(line.split("\\|")))
}
}
case class Person(id: Int, name: String) extends Named
object Persons extends …Run Code Online (Sandbox Code Playgroud)