小编kpo*_*kpo的帖子

Spark"CodeGenerator:无法编译"与Dataset.groupByKey

我是Scala和Spark的新手,所以希望有人能告诉我这里我出错的地方.

我有一个三列数据集(id,name,year),我想找到每个名字的最近一年.换一种说法:

BEFORE                                          AFTER
| id_1 | name_1 | 2015 |                        | id_2 | name_1 | 2016 |
| id_2 | name_1 | 2016 |                        | id_4 | name_2 | 2015 |
| id_3 | name_1 | 2014 | 
| id_4 | name_2 | 2015 |
| id_5 | name_2 | 2014 |
Run Code Online (Sandbox Code Playgroud)

我想groupByKey并且reduceGroups会完成工作:

val latestYears = ds
  .groupByKey(_.name)
  .reduceGroups((left, right) => if (left.year > right.year) left else right)
  .map(group => group._2)
Run Code Online (Sandbox Code Playgroud)

但它给出了这个错误,并吐出了很多生成的Java代码:

ERROR CodeGenerator: failed …
Run Code Online (Sandbox Code Playgroud)

code-generation scala exception apache-spark

7
推荐指数
1
解决办法
1180
查看次数

Apache Spark - 数据集操作在抽象基类中失败了吗?

我正在尝试将一些常见代码提取到抽象类中,但遇到了问题.

假设我正在读取格式为"id | name"的文件:

case class Person(id: Int, name: String) extends Serializable

object Persons {
  def apply(lines: Dataset[String]): Dataset[Person] = {
    import lines.sparkSession.implicits._
    lines.map(line => {
      val fields = line.split("\\|")
      Person(fields(0).toInt, fields(1))
    })
  }
}

Persons(spark.read.textFile("persons.txt")).show()
Run Code Online (Sandbox Code Playgroud)

大.这很好用.现在让我们说我想用"名称"字段读取许多不同的文件,因此我将提取出所有常见的逻辑:

trait Named extends Serializable { val name: String }

abstract class NamedDataset[T <: Named] {
  def createRecord(fields: Array[String]): T
  def apply(lines: Dataset[String]): Dataset[T] = {
    import lines.sparkSession.implicits._
    lines.map(line => createRecord(line.split("\\|")))
  }
}

case class Person(id: Int, name: String) extends Named

object Persons extends …
Run Code Online (Sandbox Code Playgroud)

abstract-class scala apache-spark

5
推荐指数
1
解决办法
1079
查看次数