And*_*rea 6 scala apache-spark apache-spark-sql
我有一个类似这样的数据框df.printSchema:
root
|-- ts: timestamp (nullable = true)
|-- geoip: struct (nullable = true)
|    |-- city: string (nullable = true)
|    |-- continent: string (nullable = true)
|    |-- location: struct (nullable = true)
|    |    |-- lat: float (nullable = true)
|    |    |-- lon: float (nullable = true)
我知道,例如用df = df.withColumn("error", lit(null).cast(StringType))我可以添加一个null名为场error型的String正下方root.如何在geoipStruct下或Struct 下添加相同的字段location?
我也试过df = df.withColumn("geoip.error", lit(null).cast(StringType))没有运气.
Jac*_*ski 13
TL; DR您必须以某种方式映射数据集中的行.
使用map操作可以提供最大的灵活性,因为您可以完全控制行的最终结构.
map [U](func:(T)⇒U)(隐式arg0:编码器[U]):数据集[U](特定于Scala)返回包含应用于
func每个元素的结果的新数据集.
您的案例如下:
// Create a sample dataset to work with
scala> val df = Seq("timestamp").
  toDF("ts").
  withColumn("geoip", struct(lit("Warsaw") as "city", lit("Europe") as "continent"))
df: org.apache.spark.sql.DataFrame = [ts: string, geoip: struct<city: string, continent: string>]
scala> df.show
+---------+---------------+
|       ts|          geoip|
+---------+---------------+
|timestamp|[Warsaw,Europe]|
+---------+---------------+
scala> df.printSchema
root
 |-- ts: string (nullable = true)
 |-- geoip: struct (nullable = false)
 |    |-- city: string (nullable = false)
 |    |-- continent: string (nullable = false)
val newDF = df.
  as[(String, (String, String))].  // <-- convert to typed Dataset as it makes map easier
  map { case (ts, (city, continent)) =>
    (ts, (city, continent, "New field with some value")) }. // <-- add new column
  toDF("timestamp", "geoip") // <-- name the top-level fields
scala> newDF.printSchema
root
 |-- timestamp: string (nullable = true)
 |-- geoip: struct (nullable = true)
 |    |-- _1: string (nullable = true)
 |    |-- _2: string (nullable = true)
 |    |-- _3: string (nullable = true)
当你丢失列的名字时,这并不漂亮.
让我们用正确的名称定义模式.这就是你可以将StructType与StructFields 一起使用的地方(你也可以使用一组案例类,但我把它作为家庭练习留给你).
import org.apache.spark.sql.types._
val geoIP = StructType(
  $"city".string ::
  $"continent".string ::
  $"new_field".string ::
  Nil
)
val mySchema = StructType(
  $"timestamp".string ::
  $"geoip".struct(geoIP) ::
  Nil
)
scala> mySchema.printTreeString
root
 |-- timestamp: string (nullable = true)
 |-- geoip: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- continent: string (nullable = true)
 |    |-- new_field: string (nullable = true)
将新模式应用于正确的名称.
val properNamesDF = spark.createDataFrame(newDF.rdd, mySchema)
scala> properNamesDF.show(truncate = false)
+---------+-----------------------------------------+
|timestamp|geoip                                    |
+---------+-----------------------------------------+
|timestamp|[Warsaw,Europe,New field with some value]|
+---------+-----------------------------------------+
如果您觉得相当冒险,您可能希望将其StructType作为集合类型使用,并使用Scala的Collection API和复制构造函数重新塑造它.
你想要去多深,以及想要修改什么级别的"结构体结构"并不重要.只需将StructType视为StructField的集合,而StructField又可以是StructType.
val oldSchema = newDF.schema
val names = Seq("city", "continent", "new_field")
val geoipFields = oldSchema("geoip").
  dataType.
  asInstanceOf[StructType].
  zip(names).
  map { case (field, name) => field.copy(name = name) }
val myNewSchema = StructType(
  $"timestamp".string :: 
  $"geoip".struct(StructType(geoipFields)) :: Nil)
val properNamesDF = spark.createDataFrame(newDF.rdd, myNewSchema)
scala> properNamesDF.printSchema
root
 |-- timestamp: string (nullable = true)
 |-- geoip: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- continent: string (nullable = true)
 |    |-- new_field: string (nullable = true)
您可以将withColumn运算符与struct函数一起使用.
withColumn(colName:String,col:Column):DataFrame通过添加列或替换具有相同名称的现有列来返回新的数据集.
struct(cols:Column*):Column创建一个新的struct列.
代码可能如下所示:
val anotherNewDF = df.
  withColumn("geoip", // <-- use the same column name so you hide the existing one
    struct(
      $"geoip.city", // <-- reference existing column to copy the values
      $"geoip.continent",
      lit("new value") as "new_field")) // <-- new field with fixed value
scala> anotherNewDF.printSchema
root
 |-- ts: string (nullable = true)
 |-- geoip: struct (nullable = false)
 |    |-- city: string (nullable = false)
 |    |-- continent: string (nullable = false)
 |    |-- new_field: string (nullable = false)
根据@shj的评论,您可以使用通配符来避免重新列出列,这使得它非常灵活,例如
val anotherNewDF = df
  .withColumn("geoip",
    struct(
      $"geoip.*", // <-- the wildcard here
      lit("new value") as "new_field"))
火花3.1+
col("geoip").withField("error", lit(null).cast("string"))
输入示例:
val df = Seq(("Vilnius", "Europe", 1, 1)).toDF("city", "continent", "lat", "lon")
         .withColumn("location", struct("lat", "lon").as("location"))
         .select(struct("city", "continent", "location").as("geoip"))
df.printSchema()
// root
//  |-- geoip: struct (nullable = false)
//  |    |-- city: string (nullable = true)
//  |    |-- continent: string (nullable = true)
//  |    |-- location: struct (nullable = false)
//  |    |    |-- lat: integer (nullable = false)
//  |    |    |-- lon: integer (nullable = false)
例子#1
val df2 = df.withColumn("geoip", col("geoip").withField("error", lit(null).cast("string")))
df2.printSchema()
// root
//  |-- geoip: struct (nullable = false)
//  |    |-- city: string (nullable = true)
//  |    |-- continent: string (nullable = true)
//  |    |-- location: struct (nullable = false)
//  |    |    |-- lat: integer (nullable = false)
//  |    |    |-- lon: integer (nullable = false)
//  |    |-- error: string (nullable = true)
例子#2
val df3 = df2.withColumn("geoip", col("geoip").withField("location.error", lit(null).cast("string")))
df3.printSchema()
// root
//  |-- geoip: struct (nullable = false)
//  |    |-- city: string (nullable = true)
//  |    |-- continent: string (nullable = true)
//  |    |-- location: struct (nullable = false)
//  |    |    |-- lat: integer (nullable = false)
//  |    |    |-- lon: integer (nullable = false)
//  |    |    |-- error: string (nullable = true)
//  |    |-- error: string (nullable = true)
| 归档时间: | 
 | 
| 查看次数: | 4338 次 | 
| 最近记录: |