小编mah*_*hdi的帖子

使用定义的 StructType 转换 Spark 数据帧的值

有没有办法使用 StructType 来转换数据帧的所有值?

让我用一个例子来解释我的问题:

假设我们从文件读取后获得了一个数据帧(我提供了生成此数据帧的代码,但在我的现实世界项目中,我在从文件读取后获得了此数据帧):

    import org.apache.spark.sql.{Row, SparkSession}
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions._
    import spark.implicits._
    val rows1 = Seq(
      Row("1", Row("a", "b"), "8.00", Row("1","2")),
      Row("2", Row("c", "d"), "9.00", Row("3","4"))
    )

    val rows1Rdd = spark.sparkContext.parallelize(rows1, 4)

    val schema1 = StructType(
      Seq(
        StructField("id", StringType, true),
        StructField("s1", StructType(
          Seq(
            StructField("x", StringType, true),
            StructField("y", StringType, true)
          )
        ), true),
        StructField("d", StringType, true),
        StructField("s2", StructType(
          Seq(
            StructField("u", StringType, true),
            StructField("v", StringType, true)
          )
        ), true)
      )
    )

    val df1 = spark.createDataFrame(rows1Rdd, schema1)

    println("Schema with nested struct") …
Run Code Online (Sandbox Code Playgroud)

scala dataframe apache-spark apache-spark-sql

5
推荐指数
1
解决办法
7245
查看次数

如何使用selectExpr在spark数据帧中转换结构数组?

如何在火花数据帧中投射结构数组?

让我通过一个例子来解释我想要做什么。我们将首先创建一个包含行数组和嵌套行的数据框。我的整数尚未在数据框中进行转换,它们被创建为字符串:

import org.apache.spark.sql._
import org.apache.spark.sql.types._
val rows1 = Seq(
  Row("1", Row("a", "b"), "8.00", Seq(Row("1","2"), Row("12","22"))),
  Row("2", Row("c", "d"), "9.00", Seq(Row("3","4"), Row("33","44")))
)

val rows1Rdd = spark.sparkContext.parallelize(rows1, 4)

val schema1 = StructType(
  Seq(
    StructField("id", StringType, true),
    StructField("s1", StructType(
      Seq(
        StructField("x", StringType, true),
        StructField("y", StringType, true)
      )
    ), true),
    StructField("d", StringType, true),
    StructField("s2", ArrayType(StructType(
      Seq(
        StructField("u", StringType, true),
        StructField("v", StringType, true)
      )
    )), true)
  )
)

val df1 = spark.createDataFrame(rows1Rdd, schema1)
Run Code Online (Sandbox Code Playgroud)

这是创建的数据框的架构:

import org.apache.spark.sql._
import org.apache.spark.sql.types._
val rows1 = Seq(
  Row("1", …
Run Code Online (Sandbox Code Playgroud)

sql scala dataframe apache-spark apache-spark-sql

4
推荐指数
1
解决办法
6140
查看次数

如何使用 Spark Structured Streaming 逐块处理文件?

我正在处理大量文件,并且我想逐块处理这些文件,假设在每个批次期间,我想分别处理每 50 个文件。

如何使用 Spark 结构化流来做到这一点?

我看到Jacek Laskowski(/sf/users/91374111/)在类似的问题(Spark to process rdd chunk by chunk from json files and post to Kafka topic)中说,可以使用Spark 结构化流,但我找不到任何有关它的示例。

多谢,

scala apache-spark spark-structured-streaming

3
推荐指数
1
解决办法
5605
查看次数