以下代码引发异常:java.lang.UnsupportedOperationException:未定义架构的行上的fieldIndex未定义.当在使用ExpressionEncoder,groupedByKey和flatMap的数据框上的groupByKey和flatMap调用之后返回的数据帧上调用时,会发生这种情况.
逻辑流程:originalDf-> groupByKey-> flatMap-> groupByKey-> flatMap-> show
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{ IntegerType, StructField, StructType}
import scala.collection.mutable.ListBuffer
object Test {
def main(args: Array[String]): Unit = {
val values = List(List("1", "One") ,List("1", "Two") ,List("2", "Three"),List("2","4")).map(x =>(x(0), x(1)))
val session = SparkSession.builder.config("spark.master", "local").getOrCreate
import session.implicits._
val dataFrame = values.toDF
dataFrame.show()
dataFrame.printSchema()
val newSchema = StructType(dataFrame.schema.fields
++ Array(
StructField("Count", IntegerType, false)
)
)
val expr = RowEncoder.apply(newSchema)
val tranform = dataFrame.groupByKey(row => row.getAs[String]("_1")).flatMapGroups((key, inputItr) => {
val inputSeq = inputItr.toSeq …
Run Code Online (Sandbox Code Playgroud)