Chi*_*iku 1 scala apache-spark apache-spark-sql
我正在 Scala/Spark 中对行级别进行一些计算。我有一个使用下面的 JSON 创建的数据框 -
{"available":false,"createTime":"2016-01-08","dataValue":{"names_source":{"first_names":["abc", "def"],"last_names_id":[123,456]},"another_source_array":[{"first":"1.1","last":"ONE"}],"another_source":"TableSources","location":"GMP", "timestamp":"2018-02-11"},"deleteTime":"2016-01-08"}
Run Code Online (Sandbox Code Playgroud)
您可以直接使用此 JSON 创建数据框。我的架构如下所示-
root
|-- available: boolean (nullable = true)
|-- createTime: string (nullable = true)
|-- dataValue: struct (nullable = true)
| |-- another_source: string (nullable = true)
| |-- another_source_array: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- first: string (nullable = true)
| | | |-- last: string (nullable = true)
| |-- location: string (nullable = true)
| |-- names_source: struct (nullable = true)
| | |-- first_names: array (nullable = true)
| | | |-- element: string (containsNull = true)
| | |-- last_names_id: array (nullable = true)
| | | |-- element: long (containsNull = true)
| |-- timestamp: string (nullable = true)
|-- deleteTime: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
我正在使用 readSchema 分别读取所有列并使用 writeSchema 写入。在两列复杂的列中,我可以处理其中一列,但不能处理其他列。
以下是我阅读架构的一部分-
.add("names_source", StructType(
StructField("first_names", ArrayType.apply(StringType)) ::
StructField("last_names_id", ArrayType.apply(DoubleType)) ::
Nil
))
.add("another_source_array", ArrayType(StructType(
StructField("first", StringType) ::
StructField("last", StringType) ::
Nil
)))
Run Code Online (Sandbox Code Playgroud)
这是我写模式的一部分-
.add("names_source", StructType.apply(Seq(
StructField("first_names", StringType),
StructField("last_names_id", DoubleType))
))
.add("another_source_array", ArrayType(StructType.apply(Seq(
StructField("first", StringType),
StructField("last", StringType))
)))
Run Code Online (Sandbox Code Playgroud)
在处理中,我使用一种方法来索引所有列。下面是我的函数代码-
def myMapRedFunction(df: DataFrame, spark: SparkSession): DataFrame = {
val columnIndex = dataIndexingSchema.fieldNames.zipWithIndex.toMap
val myRDD = df.rdd
.map(row => {
Row(
row.getAs[Boolean](columnIndex("available")),
parseDate(row.getAs[String](columnIndex("create_time"))),
??I Need help here??
row.getAs[String](columnIndex("another_source")),
anotherSourceArrayFunction(row.getSeq[Row](columnIndex("another_source_array"))),
row.getAs[String](columnIndex("location")),
row.getAs[String](columnIndex("timestamp")),
parseDate(row.getAs[String](columnIndex("delete_time")))
)
}).distinct
spark.createDataFrame(myRDD, dataWriteSchema)
}
Run Code Online (Sandbox Code Playgroud)
another_source_array列正在按anotherSourceArrayFunction方法处理,以确保我们根据要求获得架构。我需要一个类似的函数来获取names_source列。下面是我用于another_source_array列的函数。
def anotherSourceArrayFunction(data: Seq[Row]): Seq[Row] = {
if (data == null) {
data
} else {
data.map(r => {
val first = r.getAs[String]("first").ToUpperCase()
val last = r.getAs[String]("last")
new GenericRowWithSchema(Array(first,last), StructType(
StructField("first", StringType) ::
StructField("last", StringType) ::
Nil
))
})
}
}
Run Code Online (Sandbox Code Playgroud)
大概在短,我需要像这样,在那里我能得到我的names_source立柱结构为结构。
names_source:struct<first_names:array<string>,last_names_id:array<bigint>>
another_source_array:array<struct<first:string,last:string>>
Run Code Online (Sandbox Code Playgroud)
以上是最终需要的列模式。我能够another_source_array 正确获得并需要帮助names_source。我认为我为此专栏编写的架构是正确的,但我不确定。但我最终需要names_source:struct<first_names:array<string>,last_names_id:array<bigint>>作为列模式。
注意:我可以another_source_array毫无问题地仔细获取列。我把这个函数留在这里是为了让它更好地理解。
从我在您尝试过的所有代码中看到的是,您正在尝试将struct dataValue列展平以分隔列。
如果我的假设是正确的,那么您就不必经历如此复杂的过程。你可以简单地做以下
val myRDD = df.rdd
.map(row => {
Row(
row.getAs[Boolean]("available"),
parseDate(row.getAs[String]("createTime")),
row.getAs[Row]("dataValue").getAs[Row]("names_source"),
row.getAs[Row]("dataValue").getAs[String]("another_source"),
row.getAs[Row]("dataValue").getAs[Seq[Row]]("another_source_array"),
row.getAs[Row]("dataValue").getAs[String]("location"),
row.getAs[Row]("dataValue").getAs[String]("timestamp"),
parseDate(row.getAs[String]("deleteTime"))
)
}).distinct
import org.apache.spark.sql.types._
val dataWriteSchema = StructType(Seq(
StructField("createTime", DateType, true),
StructField("createTime", StringType, true),
StructField("names_source", StructType(Seq(StructField("first_names", ArrayType(StringType), true), StructField("last_names_id", ArrayType(LongType), true))), true),
StructField("another_source", StringType, true),
StructField("another_source_array", ArrayType(StructType.apply(Seq(StructField("first", StringType),StructField("last", StringType)))), true),
StructField("location", StringType, true),
StructField("timestamp", StringType, true),
StructField("deleteTime", DateType, true)
))
spark.createDataFrame(myRDD, dataWriteSchema).show(false)
Run Code Online (Sandbox Code Playgroud)
您可以简单地.*在struct column 上使用struct column的元素在单独的列上
import org.apache.spark.sql.functions._
df.select(col("available"), col("createTime"), col("dataValue.*"), col("deleteTime")).show(false)
Run Code Online (Sandbox Code Playgroud)
您必须在此方法中将字符串日期列更改为 dateType
在这两种情况下,您都将获得输出
+---------+----------+-----------------------------------------------+--------------+--------------------+--------+----------+----------+
|available|createTime|names_source |another_source|another_source_array|location|timestamp |deleteTime|
+---------+----------+-----------------------------------------------+--------------+--------------------+--------+----------+----------+
|false |2016-01-08|[WrappedArray(abc, def),WrappedArray(123, 456)]|TableSources |[[1.1,ONE]] |GMP |2018-02-11|2016-01-08|
+---------+----------+-----------------------------------------------+--------------+--------------------+--------+----------+----------+
Run Code Online (Sandbox Code Playgroud)
我希望答案有帮助
| 归档时间: |
|
| 查看次数: |
8738 次 |
| 最近记录: |