尝试将数据帧行映射到更新行时出现编码器错误

Adv*_*ika 33 scala apache-spark apache-spark-sql apache-spark-dataset apache-spark-encoders

当我试图在我的代码中做同样的事情,如下所述

dataframe.map(row => {
  val row1 = row.getAs[String](1)
  val make = if (row1.toLowerCase == "tesla") "S" else row1
  Row(row(0),make,row(2))
})
Run Code Online (Sandbox Code Playgroud)

我从这里采取了上述参考: Scala:如何使用scala替换Dataframs中的值 但是我收到编码器错误

无法找到存储在数据集中的类型的编码器.导入spark.im plicits支持原始类型(Int,S tring等)和产品类型(case类)._将在以后的版本中添加对序列化其他类型的支持.

注意:我正在使用spark 2.0!

zer*_*323 71

这里没有什么意外的.您正在尝试使用已使用Spark 1.x编写且Spark 2.0不再支持的代码:

  • 在1.x DataFrame.map((Row) ? T)(ClassTag[T]) ? RDD[T]
  • 在2.x Dataset[Row].map((Row) ? T)(Encoder[T]) ? Dataset[T]

说实话,它在1.x中也没有多大意义.独立于版本,您只需使用DataFrameAPI:

import org.apache.spark.sql.functions.{when, lower}

val df = Seq(
  (2012, "Tesla", "S"), (1997, "Ford", "E350"),
  (2015, "Chevy", "Volt")
).toDF("year", "make", "model")

df.withColumn("make", when(lower($"make") === "tesla", "S").otherwise($"make"))
Run Code Online (Sandbox Code Playgroud)

如果你真的想使用map你应该使用静态类型Dataset:

import spark.implicits._

case class Record(year: Int, make: String, model: String)

df.as[Record].map {
  case tesla if tesla.make.toLowerCase == "tesla" => tesla.copy(make = "S")
  case rec => rec
}
Run Code Online (Sandbox Code Playgroud)

或者至少返回一个具有隐式编码器的对象:

df.map {
  case Row(year: Int, make: String, model: String) => 
    (year, if(make.toLowerCase == "tesla") "S" else make, model)
}
Run Code Online (Sandbox Code Playgroud)

最后,如果出于一些完全疯狂的原因,你真的想要映射,Dataset[Row]你必须提供所需的编码器:

import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

// Yup, it would be possible to reuse df.schema here
val schema = StructType(Seq(
  StructField("year", IntegerType),
  StructField("make", StringType),
  StructField("model", StringType)
))

val encoder = RowEncoder(schema)

df.map {
  case Row(year, make: String, model) if make.toLowerCase == "tesla" => 
    Row(year, "S", model)
  case row => row
} (encoder)
Run Code Online (Sandbox Code Playgroud)

  • @JaneWayne因为a)你没有得到'DataFrame`和二进制`编码器'提供的性能提升b)你没有得到类型安全c)你明确匹配每个类型,这使得它冗长和容易出错.d)你必须指定`Encoder`的模式.再次详细说明容易出错.对于`flatMap`喜欢'Dataframe``blosde`通常绰绰有余. (9认同)

小智 5

对于预先知道数据帧架构的情况,由@ zero323给出的答案是解决方案

但对于具有动态模式/或将多个数据帧传递给通用函数的场景:以下代码在从1.6.1从2.2.0迁移时为我们工作

import org.apache.spark.sql.Row

val df = Seq(
   (2012, "Tesla", "S"), (1997, "Ford", "E350"),
   (2015, "Chevy", "Volt")
 ).toDF("year", "make", "model")

val data = df.rdd.map(row => {
  val row1 = row.getAs[String](1)
  val make = if (row1.toLowerCase == "tesla") "S" else row1
  Row(row(0),make,row(2))
})
Run Code Online (Sandbox Code Playgroud)

该代码在两个版本的spark上执行。

缺点:无法应用spark在dataframe / datasets api上提供的优化。