Wah*_*vic 4 scala apache-spark apache-spark-dataset apache-spark-encoders
我在Zeppelin笔记本上使用Spark,而groupByKey()似乎不起作用.
这段代码:
df.groupByKey(row => row.getLong(0))
.mapGroups((key, iterable) => println(key))
Run Code Online (Sandbox Code Playgroud)
给我这个错误(可能是一个编译错误,因为它在我正在处理的数据集很大的时候很快出现):
error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
Run Code Online (Sandbox Code Playgroud)
我尝试添加一个case类并将所有行映射到它中,但仍然遇到了同样的错误
import spark.implicits._
case class DFRow(profileId: Long, jobId: String, state: String)
def getDFRow(row: Row):DFRow = {
return DFRow(row.getLong(row.fieldIndex("item0")),
row.getString(row.fieldIndex("item1")),
row.getString(row.fieldIndex("item2")))
}
df.map(DFRow(_))
.groupByKey(row => row.getLong(0))
.mapGroups((key, iterable) => println(key))
Run Code Online (Sandbox Code Playgroud)
我的Dataframe的架构是:
root
|-- item0: long (nullable = true)
|-- item1: string (nullable = true)
|-- item2: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
你想mapGroups用的功能(Long, Iterator[Row]) => Unit,也没有Encoder对Unit(而不是它将使意义有一个).
在的通用部分Dataset未集中在SQL DSL(API DataFrame => DataFrame,DataFrame => RelationalGroupedDataset,RelationalGroupedDataset => DataFrame,RelationalGroupedDataset => RelationalGroupedDataset)需要的输出值隐式或显式的编码器.
由于没有预定义的Row对象编码器,因此使用Dataset[Row]静态类型数据的方法设计没有多大意义.根据经验,您应该首先转换为静态类型的变体:
df.as[(Long, String, String)]
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
10602 次 |
| 最近记录: |