如何在Spark中读取嵌套集合

Tag*_*gar 18 nested apache-spark parquet apache-spark-sql lateral-join

我有一张镶有桌子的镶木桌子

,array <struct <col1,col2,.. colN >>

可以使用LATERAL VIEW语法在Hive中对此表运行查询.

如何将此表读入RDD,更重要的是如何在Spark中过滤,映射等嵌套集合?

在Spark文档中找不到对此的任何引用.提前感谢您的任何信息!

PS.感觉可能有助于在桌子上给出一些统计数据.主表~600中的列数.行数~200m.嵌套集合中的"列"数〜10.平均集合中的平均记录数~35.

Lom*_*ard 20

在嵌套集合的情况下没有魔力.Spark将以同样的方式处理a RDD[(String, String)]和a RDD[(String, Seq[String])].

但是,从Parquet文件中读取这样的嵌套集合可能会很棘手.

我们来自spark-shell(1.3.1)的例子:

scala> import sqlContext.implicits._
import sqlContext.implicits._

scala> case class Inner(a: String, b: String)
defined class Inner

scala> case class Outer(key: String, inners: Seq[Inner])
defined class Outer
Run Code Online (Sandbox Code Playgroud)

写下镶木地板文件:

scala> val outers = sc.parallelize(List(Outer("k1", List(Inner("a", "b")))))
outers: org.apache.spark.rdd.RDD[Outer] = ParallelCollectionRDD[0] at parallelize at <console>:25

scala> outers.toDF.saveAsParquetFile("outers.parquet")
Run Code Online (Sandbox Code Playgroud)

阅读镶木地板文件:

scala> import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.expressions.Row

scala> val dataFrame = sqlContext.parquetFile("outers.parquet")
dataFrame: org.apache.spark.sql.DataFrame = [key: string, inners: array<struct<a:string,b:string>>]   

scala> val outers = dataFrame.map { row =>
     |   val key = row.getString(0)
     |   val inners = row.getAs[Seq[Row]](1).map(r => Inner(r.getString(0), r.getString(1)))
     |   Outer(key, inners)
     | }
outers: org.apache.spark.rdd.RDD[Outer] = MapPartitionsRDD[8] at map at DataFrame.scala:848
Run Code Online (Sandbox Code Playgroud)

重要的是row.getAs[Seq[Row]](1).嵌套序列的内部表示structArrayBuffer[Row],您可以使用它的任何超类型而不是Seq[Row].这1是外行中的列索引.我在getAs这里使用了这个方法,但最新版本的Spark还有其他选择.请参阅Row特征的源代码.

现在您已经拥有了RDD[Outer],您可以应用任何想要的转换或操作.

// Filter the outers
outers.filter(_.inners.nonEmpty)

// Filter the inners
outers.map(outer => outer.copy(inners = outer.inners.filter(_.a == "a")))
Run Code Online (Sandbox Code Playgroud)

请注意,我们仅使用spark-SQL库来读取镶木地板文件.例如,您可以在将数据映射到RDD之前直接在DataFrame上选择所需的列.

dataFrame.select('col1, 'col2).map { row => ... }
Run Code Online (Sandbox Code Playgroud)


dnl*_*rky 8

我会给出一个基于Python的答案,因为我正在使用它.我认为Scala有类似的东西.

explode根据Python API文档,该函数已添加到Spark 1.4.0中以处理DataFrames中的嵌套数组.

创建测试数据框:

from pyspark.sql import Row

df = sqlContext.createDataFrame([Row(a=1, intlist=[1,2,3]), Row(a=2, intlist=[4,5,6])])
df.show()

## +-+--------------------+
## |a|             intlist|
## +-+--------------------+
## |1|ArrayBuffer(1, 2, 3)|
## |2|ArrayBuffer(4, 5, 6)|
## +-+--------------------+
Run Code Online (Sandbox Code Playgroud)

使用explode扁平化列表列:

from pyspark.sql.functions import explode

df.select(df.a, explode(df.intlist)).show()

## +-+---+
## |a|_c0|
## +-+---+
## |1|  1|
## |1|  2|
## |1|  3|
## |2|  4|
## |2|  5|
## |2|  6|
## +-+---+
Run Code Online (Sandbox Code Playgroud)