And*_*rea 6 scala apache-spark apache-spark-sql
我有一个类似这样的数据框df.printSchema:
root
|-- ts: timestamp (nullable = true)
|-- geoip: struct (nullable = true)
| |-- city: string (nullable = true)
| |-- continent: string (nullable = true)
| |-- location: struct (nullable = true)
| | |-- lat: float (nullable = true)
| | |-- lon: float (nullable = true)
Run Code Online (Sandbox Code Playgroud)
我知道,例如用df = df.withColumn("error", lit(null).cast(StringType))我可以添加一个null名为场error型的String正下方root.如何在geoipStruct下或Struct 下添加相同的字段location?
我也试过df = df.withColumn("geoip.error", lit(null).cast(StringType))没有运气.
Jac*_*ski 13
TL; DR您必须以某种方式映射数据集中的行.
使用map操作可以提供最大的灵活性,因为您可以完全控制行的最终结构.
map [U](func:(T)⇒U)(隐式arg0:编码器[U]):数据集[U](特定于Scala)返回包含应用于
func每个元素的结果的新数据集.
您的案例如下:
// Create a sample dataset to work with
scala> val df = Seq("timestamp").
toDF("ts").
withColumn("geoip", struct(lit("Warsaw") as "city", lit("Europe") as "continent"))
df: org.apache.spark.sql.DataFrame = [ts: string, geoip: struct<city: string, continent: string>]
scala> df.show
+---------+---------------+
| ts| geoip|
+---------+---------------+
|timestamp|[Warsaw,Europe]|
+---------+---------------+
scala> df.printSchema
root
|-- ts: string (nullable = true)
|-- geoip: struct (nullable = false)
| |-- city: string (nullable = false)
| |-- continent: string (nullable = false)
val newDF = df.
as[(String, (String, String))]. // <-- convert to typed Dataset as it makes map easier
map { case (ts, (city, continent)) =>
(ts, (city, continent, "New field with some value")) }. // <-- add new column
toDF("timestamp", "geoip") // <-- name the top-level fields
scala> newDF.printSchema
root
|-- timestamp: string (nullable = true)
|-- geoip: struct (nullable = true)
| |-- _1: string (nullable = true)
| |-- _2: string (nullable = true)
| |-- _3: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
当你丢失列的名字时,这并不漂亮.
让我们用正确的名称定义模式.这就是你可以将StructType与StructFields 一起使用的地方(你也可以使用一组案例类,但我把它作为家庭练习留给你).
import org.apache.spark.sql.types._
val geoIP = StructType(
$"city".string ::
$"continent".string ::
$"new_field".string ::
Nil
)
val mySchema = StructType(
$"timestamp".string ::
$"geoip".struct(geoIP) ::
Nil
)
scala> mySchema.printTreeString
root
|-- timestamp: string (nullable = true)
|-- geoip: struct (nullable = true)
| |-- city: string (nullable = true)
| |-- continent: string (nullable = true)
| |-- new_field: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
将新模式应用于正确的名称.
val properNamesDF = spark.createDataFrame(newDF.rdd, mySchema)
scala> properNamesDF.show(truncate = false)
+---------+-----------------------------------------+
|timestamp|geoip |
+---------+-----------------------------------------+
|timestamp|[Warsaw,Europe,New field with some value]|
+---------+-----------------------------------------+
Run Code Online (Sandbox Code Playgroud)
如果您觉得相当冒险,您可能希望将其StructType作为集合类型使用,并使用Scala的Collection API和复制构造函数重新塑造它.
你想要去多深,以及想要修改什么级别的"结构体结构"并不重要.只需将StructType视为StructField的集合,而StructField又可以是StructType.
val oldSchema = newDF.schema
val names = Seq("city", "continent", "new_field")
val geoipFields = oldSchema("geoip").
dataType.
asInstanceOf[StructType].
zip(names).
map { case (field, name) => field.copy(name = name) }
val myNewSchema = StructType(
$"timestamp".string ::
$"geoip".struct(StructType(geoipFields)) :: Nil)
val properNamesDF = spark.createDataFrame(newDF.rdd, myNewSchema)
scala> properNamesDF.printSchema
root
|-- timestamp: string (nullable = true)
|-- geoip: struct (nullable = true)
| |-- city: string (nullable = true)
| |-- continent: string (nullable = true)
| |-- new_field: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
您可以将withColumn运算符与struct函数一起使用.
withColumn(colName:String,col:Column):DataFrame通过添加列或替换具有相同名称的现有列来返回新的数据集.
struct(cols:Column*):Column创建一个新的struct列.
代码可能如下所示:
val anotherNewDF = df.
withColumn("geoip", // <-- use the same column name so you hide the existing one
struct(
$"geoip.city", // <-- reference existing column to copy the values
$"geoip.continent",
lit("new value") as "new_field")) // <-- new field with fixed value
scala> anotherNewDF.printSchema
root
|-- ts: string (nullable = true)
|-- geoip: struct (nullable = false)
| |-- city: string (nullable = false)
| |-- continent: string (nullable = false)
| |-- new_field: string (nullable = false)
Run Code Online (Sandbox Code Playgroud)
根据@shj的评论,您可以使用通配符来避免重新列出列,这使得它非常灵活,例如
val anotherNewDF = df
.withColumn("geoip",
struct(
$"geoip.*", // <-- the wildcard here
lit("new value") as "new_field"))
Run Code Online (Sandbox Code Playgroud)
火花3.1+
col("geoip").withField("error", lit(null).cast("string"))
Run Code Online (Sandbox Code Playgroud)
输入示例:
val df = Seq(("Vilnius", "Europe", 1, 1)).toDF("city", "continent", "lat", "lon")
.withColumn("location", struct("lat", "lon").as("location"))
.select(struct("city", "continent", "location").as("geoip"))
df.printSchema()
// root
// |-- geoip: struct (nullable = false)
// | |-- city: string (nullable = true)
// | |-- continent: string (nullable = true)
// | |-- location: struct (nullable = false)
// | | |-- lat: integer (nullable = false)
// | | |-- lon: integer (nullable = false)
Run Code Online (Sandbox Code Playgroud)
例子#1
val df2 = df.withColumn("geoip", col("geoip").withField("error", lit(null).cast("string")))
df2.printSchema()
// root
// |-- geoip: struct (nullable = false)
// | |-- city: string (nullable = true)
// | |-- continent: string (nullable = true)
// | |-- location: struct (nullable = false)
// | | |-- lat: integer (nullable = false)
// | | |-- lon: integer (nullable = false)
// | |-- error: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
例子#2
val df3 = df2.withColumn("geoip", col("geoip").withField("location.error", lit(null).cast("string")))
df3.printSchema()
// root
// |-- geoip: struct (nullable = false)
// | |-- city: string (nullable = true)
// | |-- continent: string (nullable = true)
// | |-- location: struct (nullable = false)
// | | |-- lat: integer (nullable = false)
// | | |-- lon: integer (nullable = false)
// | | |-- error: string (nullable = true)
// | |-- error: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
4338 次 |
| 最近记录: |