Spark SQL-通用数据集读取器

Ode*_*erg 1 scala apache-spark apache-spark-sql apache-spark-dataset

我试图创建通用的DataSet [T]阅读器,以避免每个阅读器调用都使用dataframe.as [..]。对原始类型和案例类的支持,所以我在想类似的东西:

def read[T <: Product](sql : String): Dataset[T] = {
  import sparkSession.implicits._
  val sqlContext = sparkSession.sqlContext
  val df: DataFrame = sqlContext.read.option("query", sql).load()
  df.as[T]
}
Run Code Online (Sandbox Code Playgroud)

但是我收到“无法为数据集中存储的类型找到编码器”错误。可以做这样的事情吗?

第二周期:

def read[T <: Product](sql : String) : Dataset[T] = {
  import sparkSession.implicits._
  innerRead(sql)
}

private def innerRead[T <: Product : Encoder](sql : String): Dataset[T] = {
  val sqlContext = sparkSession.sqlContext
  val df: DataFrame = sqlContext.read.option("query", sql).load()
  df.as[T]
}  
Run Code Online (Sandbox Code Playgroud)

以类型不匹配结尾(foudn Encoder [Nothing],必需的Encoder [T])。

我试图仅导入newProductEncoder,但结果相同。

ste*_*ino 5

为了将a转换DataFrame为a,Dataset您需要有一个Encoder。你可以通过简单地添加绑定在和上下文做EncoderT

def read[T <: Product : Encoder](sql : String): Dataset[T] = {
  import sparkSession.implicits._
  val sqlContext = sparkSession.sqlContext
  val df: DataFrame = sqlContext.read.option("query", sql).load()
  df.as[T]
}
Run Code Online (Sandbox Code Playgroud)

上下文绑定是以下语法糖:

def read[T <: Product](sql : String)(implicit $ev: Encoder[T]): Dataset[T]
Run Code Online (Sandbox Code Playgroud)

这意味着您需要在隐式上下文中具有(仅一个)实例Encoder[T]

这是必需的,因为as方法本身需要此上下文绑定。

通过导入(如您所做的那样),Spark本身可以为您提供您Encoder可能需要的大多数s(到目前为止,基元,Strings和case classes)SparkSession。但是,这些必须在呼叫站点的隐式范围内可用,这意味着您想要的内容可能更像以下内容:

def read[T <: Product : Encoder](spark: SparkSession, sql: String): Dataset[T] = {
  import spark.implicits._
  val df: DataFrame = spark.sqlContext.read.option("query", sql).load()
  df.as[T]
}

val spark: SparkSession = ??? // your SparkSession object
import spark.implicits._
val ds: Dataset[YourType] = read[YourType](spark, "select something from a_table")
Run Code Online (Sandbox Code Playgroud)