如何在不转换DataFrame并访问数据集的情况下向数据集添加列?

vde*_*dep 8 scala apache-spark

我知道使用.withColumn()和a 向Spark DataSet添加新列的方法UDF,它返回一个DataFrame.我也知道,我们可以将生成的DataFrame转换为DataSet.

我的问题是:

  1. 如果我们仍然遵循传统的DF方法(即将列名称作为UDF输入的字符串传递),DataSet的类型安全性如何在这里发挥作用
  2. 是否存在访问列的"面向对象的方式"(不将列名作为字符串传递),就像我们以前用RDD一样,用于追加新列.
  3. 如何在正常操作中访问新列,如map,filter等?

例如:

    scala> case class Temp(a : Int, b : String)    //creating case class
    scala> val df = Seq((1,"1str"),(2,"2str),(3,"3str")).toDS    // creating DS
    scala> val appendUDF = udf( (b : String) => b + "ing")      // sample UDF

    scala> df.withColumn("c",df("b"))   // adding a new column
    res5: org.apache.spark.sql.DataFrame = [a: int, b: string ... 1 more field]

    scala> res5.as[Temp]   // converting to DS
    res6: org.apache.spark.sql.Dataset[Temp] = [a: int, b: string ... 1 more field]

    scala> res6.map( x =>x.  
    // list of autosuggestion :
    a   canEqual   equals     productArity     productIterator   toString   
    b   copy       hashCode   productElement   productPrefix 
Run Code Online (Sandbox Code Playgroud)

c我添加使用的新列.withColumn()无法访问,因为列c在转换为DS时使用的情况下不在case类Temp(它只包含a&b)中res5.as[Temp].

如何访问列c

maa*_*asg 6

Datasets 的类型安全世界中,您将结构映射到另一个结构.

也就是说,对于每次转换,我们都需要数据的模式表示(因为它是RDD所需的).要访问上面的"c",我们需要创建一个新模式来提供对它的访问.

case class A(a:String)
case class BC(b:String, c:String)
val f:A => BC = a=> BC(a.a,"c") // Transforms an A into a BC

val data = (1 to 10).map(i => A(i.toString))
val dsa = spark.createDataset(data)
// dsa: org.apache.spark.sql.Dataset[A] = [a: string]

val dsb = dsa.map(f)
//dsb: org.apache.spark.sql.Dataset[BC] = [b: string, c: string]
Run Code Online (Sandbox Code Playgroud)