如何为 Scala 集合创建编码器(以实现自定义聚合器)?

Unc*_*air 4 scala apache-spark apache-spark-sql apache-spark-encoders

Spark 2.3.0 与 Scala 2.11。我正在Aggregator根据此处的文档实施自定义。聚合器需要 3 种类型的输入、缓冲区和输出。

我的聚合器必须对窗口中所有以前的行进行操作,所以我这样声明:

case class Foo(...)

object MyAggregator extends Aggregator[Foo, ListBuffer[Foo], Boolean] {
    // other override methods
    override def bufferEncoder: Encoder[ListBuffer[Mod]] = ???
}
Run Code Online (Sandbox Code Playgroud)

覆盖方法之一应该返回缓冲区类型的编码器,在这种情况下是ListBuffer. 我找不到任何合适的编码器,org.apache.spark.sql.Encoders也找不到任何其他编码方式,所以我不知道该返回什么。

我想创建一个新的 case 类,它有一个 type 的单个属性ListBuffer[Foo]并将其用作我的缓冲区类,然后使用Encoders.product它,但我不确定是否有必要或者是否还有其他我遗漏的东西。感谢您提供任何提示。

Jac*_*ski 6

您应该让 Spark SQL 完成它的工作,并使用ExpressionEncoder以下方法找到合适的编码器:

scala> spark.version
res0: String = 2.3.0

case class Mod(id: Long)

import org.apache.spark.sql.Encoder
import scala.collection.mutable.ListBuffer
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder

scala> val enc: Encoder[ListBuffer[Mod]] = ExpressionEncoder()
enc: org.apache.spark.sql.Encoder[scala.collection.mutable.ListBuffer[Mod]] = class[value[0]: array<struct<id:bigint>>]
Run Code Online (Sandbox Code Playgroud)