Spark"CodeGenerator:无法编译"与Dataset.groupByKey

kpo*_*kpo 7 code-generation scala exception apache-spark

我是Scala和Spark的新手,所以希望有人能告诉我这里我出错的地方.

我有一个三列数据集(id,name,year),我想找到每个名字的最近一年.换一种说法:

BEFORE                                          AFTER
| id_1 | name_1 | 2015 |                        | id_2 | name_1 | 2016 |
| id_2 | name_1 | 2016 |                        | id_4 | name_2 | 2015 |
| id_3 | name_1 | 2014 | 
| id_4 | name_2 | 2015 |
| id_5 | name_2 | 2014 |
Run Code Online (Sandbox Code Playgroud)

我想groupByKey并且reduceGroups会完成工作:

val latestYears = ds
  .groupByKey(_.name)
  .reduceGroups((left, right) => if (left.year > right.year) left else right)
  .map(group => group._2)
Run Code Online (Sandbox Code Playgroud)

但它给出了这个错误,并吐出了很多生成的Java代码:

ERROR CodeGenerator: failed to compile: 
org.codehaus.commons.compiler.CompileException: 
File 'generated.java', Line 21, Column 101: Unknown variable or type "value4"
Run Code Online (Sandbox Code Playgroud)

有趣的是,如果我创建一个只包含name和year列的数据集,它将按预期工作.


这是我正在运行的完整代码:

object App {

  case class Record(id: String, name: String, year: Int)

  def main(args: Array[String]) {
    val spark = SparkSession.builder().master("local").appName("test").getOrCreate()
    import spark.implicits._

    val ds = spark.createDataset[String](Seq(
        "id_1,name_1,2015",
        "id_2,name_1,2016",
        "id_3,name_1,2014",
        "id_4,name_2,2015",
        "id_5,name_2,2014"
      ))
      .map(line => {
        val fields = line.split(",")
        new Record(fields(0), fields(1), fields(2).toInt)
      })

    val latestYears = ds
      .groupByKey(_.name)
      .reduceGroups((left, right) => if (left.year > right.year) left else right)
      .map(group => group._2)

    latestYears.show()
  }


}
Run Code Online (Sandbox Code Playgroud)

编辑:我相信这可能是Spark v2.0.1的一个错误.降级到v2.0.0后,不再发生这种情况.

com*_*ist 0

您的groupByreduceGroups功能是实验性的。为什么不使用reduceByKeyapi)?

优点:

  • 从您拥有的代码中翻译应该很容易。
  • 它更稳定(不是实验性的)。
  • 它应该更高效,因为它不需要对每个组中的所有项目进行完全洗牌(这也会导致网络 I/O 减慢并溢出节点中的内存)。