Unc*_*air 4 scala apache-spark apache-spark-sql apache-spark-encoders
Spark 2.3.0 与 Scala 2.11。我正在Aggregator根据此处的文档实施自定义。聚合器需要 3 种类型的输入、缓冲区和输出。
我的聚合器必须对窗口中所有以前的行进行操作,所以我这样声明:
case class Foo(...)
object MyAggregator extends Aggregator[Foo, ListBuffer[Foo], Boolean] {
// other override methods
override def bufferEncoder: Encoder[ListBuffer[Mod]] = ???
}
Run Code Online (Sandbox Code Playgroud)
覆盖方法之一应该返回缓冲区类型的编码器,在这种情况下是ListBuffer. 我找不到任何合适的编码器,org.apache.spark.sql.Encoders也找不到任何其他编码方式,所以我不知道该返回什么。
我想创建一个新的 case 类,它有一个 type 的单个属性ListBuffer[Foo]并将其用作我的缓冲区类,然后使用Encoders.product它,但我不确定是否有必要或者是否还有其他我遗漏的东西。感谢您提供任何提示。
您应该让 Spark SQL 完成它的工作,并使用ExpressionEncoder以下方法找到合适的编码器:
scala> spark.version
res0: String = 2.3.0
case class Mod(id: Long)
import org.apache.spark.sql.Encoder
import scala.collection.mutable.ListBuffer
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
scala> val enc: Encoder[ListBuffer[Mod]] = ExpressionEncoder()
enc: org.apache.spark.sql.Encoder[scala.collection.mutable.ListBuffer[Mod]] = class[value[0]: array<struct<id:bigint>>]
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1054 次 |
| 最近记录: |