在Spark中展平行

Nir*_*cov 41 scala distributed-computing apache-spark apache-spark-sql

我正在使用scala对spark进行一些测试.我们通常会读取需要操作的json文件,如下例所示:

test.json:

{"a":1,"b":[2,3]}
Run Code Online (Sandbox Code Playgroud)
val test = sqlContext.read.json("test.json")
Run Code Online (Sandbox Code Playgroud)

如何将其转换为以下格式:

{"a":1,"b":2}
{"a":1,"b":3}
Run Code Online (Sandbox Code Playgroud)

zer*_*323 76

你可以使用explode功能:

scala> import org.apache.spark.sql.functions.explode
import org.apache.spark.sql.functions.explode


scala> val test = sqlContext.read.json(sc.parallelize(Seq("""{"a":1,"b":[2,3]}""")))
test: org.apache.spark.sql.DataFrame = [a: bigint, b: array<bigint>]

scala> test.printSchema
root
 |-- a: long (nullable = true)
 |-- b: array (nullable = true)
 |    |-- element: long (containsNull = true)

scala> val flattened = test.withColumn("b", explode($"b"))
flattened: org.apache.spark.sql.DataFrame = [a: bigint, b: bigint]

scala> flattened.printSchema
root
 |-- a: long (nullable = true)
 |-- b: long (nullable = true)

scala> flattened.show
+---+---+
|  a|  b|
+---+---+
|  1|  2|
|  1|  3|
+---+---+
Run Code Online (Sandbox Code Playgroud)

  • 试试[`import sqlContext.implicits._`](https://github.com/apache/spark/blob/8ecba3e86e53834413da8b4299f5791545cae12e/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala# L349).您也可以使用`org.apache.spark.sql.functions.col`并应用于`DataFrame(df("b"))`. (10认同)