使用任何函数获取scala中一行的结构类型元素

Chi*_*iku 1 scala apache-spark apache-spark-sql

我正在 Scala/Spark 中对行级别进行一些计算。我有一个使用下面的 JSON 创建的数据框 -

{"available":false,"createTime":"2016-01-08","dataValue":{"names_source":{"first_names":["abc", "def"],"last_names_id":[123,456]},"another_source_array":[{"first":"1.1","last":"ONE"}],"another_source":"TableSources","location":"GMP", "timestamp":"2018-02-11"},"deleteTime":"2016-01-08"}
Run Code Online (Sandbox Code Playgroud)

您可以直接使用此 JSON 创建数据框。我的架构如下所示-

root
 |-- available: boolean (nullable = true)
 |-- createTime: string (nullable = true)
 |-- dataValue: struct (nullable = true)
 |    |-- another_source: string (nullable = true)
 |    |-- another_source_array: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- first: string (nullable = true)
 |    |    |    |-- last: string (nullable = true)
 |    |-- location: string (nullable = true)
 |    |-- names_source: struct (nullable = true)
 |    |    |-- first_names: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- last_names_id: array (nullable = true)
 |    |    |    |-- element: long (containsNull = true)
 |    |-- timestamp: string (nullable = true)
 |-- deleteTime: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

我正在使用 readSchema 分别读取所有列并使用 writeSchema 写入。在两列复杂的列中,我可以处理其中一列,但不能处理其他列。

以下是我阅读架构的一部分-

  .add("names_source", StructType(
      StructField("first_names", ArrayType.apply(StringType)) ::
        StructField("last_names_id", ArrayType.apply(DoubleType)) ::
        Nil
))
.add("another_source_array", ArrayType(StructType(
      StructField("first", StringType) ::
        StructField("last", StringType) ::
        Nil
)))
Run Code Online (Sandbox Code Playgroud)

这是我写模式的一部分-

.add("names_source", StructType.apply(Seq(
           StructField("first_names", StringType),
            StructField("last_names_id", DoubleType))
  ))
  .add("another_source_array", ArrayType(StructType.apply(Seq(
           StructField("first", StringType),
            StructField("last", StringType))
   )))
Run Code Online (Sandbox Code Playgroud)

在处理中,我使用一种方法来索引所有列。下面是我的函数代码-

   def myMapRedFunction(df: DataFrame, spark: SparkSession): DataFrame = {

    val columnIndex = dataIndexingSchema.fieldNames.zipWithIndex.toMap

    val myRDD = df.rdd
      .map(row => {
      Row(
        row.getAs[Boolean](columnIndex("available")),
        parseDate(row.getAs[String](columnIndex("create_time"))),
        ??I Need help here??
        row.getAs[String](columnIndex("another_source")),
        anotherSourceArrayFunction(row.getSeq[Row](columnIndex("another_source_array"))),
        row.getAs[String](columnIndex("location")),
        row.getAs[String](columnIndex("timestamp")),
        parseDate(row.getAs[String](columnIndex("delete_time")))
      )
    }).distinct

    spark.createDataFrame(myRDD, dataWriteSchema)
  }
Run Code Online (Sandbox Code Playgroud)

another_source_array列正在按anotherSourceArrayFunction方法处理,以确保我们根据要求获得架构。我需要一个类似的函数来获取names_source列。下面是我用于another_source_array列的函数。

    def anotherSourceArrayFunction(data: Seq[Row]): Seq[Row] = {
    if (data == null) {
      data
    } else {
      data.map(r => {
        val first = r.getAs[String]("first").ToUpperCase()
        val last = r.getAs[String]("last")
        new GenericRowWithSchema(Array(first,last), StructType(
          StructField("first", StringType) ::
            StructField("last", StringType) ::
            Nil
        ))
      })
    }
  }
Run Code Online (Sandbox Code Playgroud)

大概在短,我需要像这样,在那里我能得到我的names_source立柱结构为结构。

names_source:struct<first_names:array<string>,last_names_id:array<bigint>>
another_source_array:array<struct<first:string,last:string>>
Run Code Online (Sandbox Code Playgroud)

以上是最终需要的列模式。我能够another_source_array 正确获得并需要帮助names_source。我认为我为此专栏编写的架构是正确的,但我不确定。但我最终需要names_source:struct<first_names:array<string>,last_names_id:array<bigint>>作为列模式。

注意:我可以another_source_array毫无问题地仔细获取列。我把这个函数留在这里是为了让它更好地理解。

Ram*_*jan 7

从我在您尝试过的所有代码中看到的是,您正在尝试将struct dataValue列展平以分隔列

如果我的假设是正确的,那么您就不必经历如此复杂的过程。你可以简单地做以下

val myRDD = df.rdd
  .map(row => {
    Row(
      row.getAs[Boolean]("available"),
      parseDate(row.getAs[String]("createTime")),
      row.getAs[Row]("dataValue").getAs[Row]("names_source"),
      row.getAs[Row]("dataValue").getAs[String]("another_source"),
      row.getAs[Row]("dataValue").getAs[Seq[Row]]("another_source_array"),
      row.getAs[Row]("dataValue").getAs[String]("location"),
      row.getAs[Row]("dataValue").getAs[String]("timestamp"),
      parseDate(row.getAs[String]("deleteTime"))
    )
  }).distinct

import org.apache.spark.sql.types._

val dataWriteSchema = StructType(Seq(
  StructField("createTime", DateType, true),
  StructField("createTime", StringType, true),
  StructField("names_source", StructType(Seq(StructField("first_names", ArrayType(StringType), true), StructField("last_names_id", ArrayType(LongType), true))), true),
  StructField("another_source", StringType, true),
  StructField("another_source_array", ArrayType(StructType.apply(Seq(StructField("first", StringType),StructField("last", StringType)))), true),
  StructField("location", StringType, true),
  StructField("timestamp", StringType, true),
  StructField("deleteTime", DateType, true)
))

spark.createDataFrame(myRDD, dataWriteSchema).show(false)
Run Code Online (Sandbox Code Playgroud)


使用 * 来展平结构列

您可以简单地.*struct column 上使用struct column的元素在单独的列上

import org.apache.spark.sql.functions._
df.select(col("available"), col("createTime"), col("dataValue.*"), col("deleteTime")).show(false)
Run Code Online (Sandbox Code Playgroud)

必须在此方法中将字符串日期列更改为 dateType

在这两种情况下,您都将获得输出

+---------+----------+-----------------------------------------------+--------------+--------------------+--------+----------+----------+
|available|createTime|names_source                                   |another_source|another_source_array|location|timestamp |deleteTime|
+---------+----------+-----------------------------------------------+--------------+--------------------+--------+----------+----------+
|false    |2016-01-08|[WrappedArray(abc, def),WrappedArray(123, 456)]|TableSources  |[[1.1,ONE]]         |GMP     |2018-02-11|2016-01-08|
+---------+----------+-----------------------------------------------+--------------+--------------------+--------+----------+----------+
Run Code Online (Sandbox Code Playgroud)

我希望答案有帮助