将JSON文件读入Spark数据集并从单独的Map添加列

sme*_*eeb 8 json scala apache-spark apache-spark-sql apache-spark-dataset

Spark 2.1和Scala 2.11在这里.我有一个大的Map[String,Date],它有10K键/值对.我还有10K JSON文件存在于Spark可访问的文件系统中:

mnt/
    some/
        path/
            data00001.json
            data00002.json
            data00003.json
            ...
            data10000.json
Run Code Online (Sandbox Code Playgroud)

映射中的每个KV对对应于其各自的JSON文件(因此第一个映射KV对对应于data00001.json,等等)

我想将所有这些JSON文件读入1个大型Spark中Dataset,当我在它时,向该数据集添加两个新列(JSON文件中不存在).每个映射键都是第一个新列的值,每个键的值将是第二个新列的值:

val objectSummaries = getScalaList()
val dataFiles = objectSummaries.filter { _.getKey.endsWith("data.json") }
val dataDirectories = dataFiles.map(dataFile => {
  val keyComponents = dataFile.getKey.split("/")
  val parent = if (keyComponents.length > 1) keyComponents(keyComponents.length - 2) else "/"
  (parent, dataFile.getLastModified)
})

// TODO: How to take each KV pair from dataDirectories above and store them as the values for the
// two new columns?
val allDataDataset = spark.read.json("mnt/some/path/*.json")
  .withColumn("new_col_1", dataDirectories._1)
  .withColumn("new_col_2", dataDirectories._2)
Run Code Online (Sandbox Code Playgroud)

我已经确认,mnt/some/path/*.json当我删除withColumn方法并执行操作时,Spark将遵循通配符()并将所有JSON文件读入单个数据集allData.show().所以我在那里都很好.

我正在努力的是:如何添加两个新列,然后正确地拔出所有键/值映射元素?

dum*_*tru 4

如果我理解正确的话,你想将地图中的 KV 与 json 文件中的数据帧关联起来。

我将尝试将问题简化为仅 3 个文件和 3 个键值全部排序。

val kvs = Map("a" -> 1, "b" -> 2, "c" -> 3)
val files = List("data0001.json", "data0002.json", "data0003.json")
Run Code Online (Sandbox Code Playgroud)

定义一个案例类来处理更简单的文件、键、值

case class FileWithKV(fileName: String, key: String, value: Int)
Run Code Online (Sandbox Code Playgroud)

将压缩文件和 kvs

val filesWithKVs = files.zip(kvs)
  .map(p => FileWithKV(p._1, p._2._1, p._2._2))
Run Code Online (Sandbox Code Playgroud)

它会看起来像这样

filesWithKVs: List[FileWithKV] = List(FileWithKV(data0001.json,a,1), FileWithKV(data0002.json,b,2), FileWithKV(data0003.json,c,3))
Run Code Online (Sandbox Code Playgroud)

然后,我们从集合头部开始的初始数据框开始,然后开始向左折叠以构造包含所有文件的整个数据框,所有列都是从 KV 动态生成的

val head = filesWithKVs.head
val initialDf = spark
.read.json(head.filename)
.withColumn(s"new_col_1", lit(head.key)) 
.withColumn(s"new_col_2", lit(head.value))
Run Code Online (Sandbox Code Playgroud)

现在是折叠部分

val dfAll = filesWithKVs.tail.foldLeft(initialDf)((df, fileWithKV) => {
    val newDf = spark
    .read.json(fileWithKV.filename)
    .withColumn(s"new_col_1", lit(fileWithKV.key)) 
    .withColumn(s"new_col_2", lit(fileWithKV.value))
    // union the dataframes to capture file by file, key value with key value
    df.union(newDf)
})
Run Code Online (Sandbox Code Playgroud)

数据框将如下所示,假设 json 文件中的 3 个 json 文件中的每一个都有一个名为 bar 的列和一个值 foo

+---+----------+----------+
|bar|new_col_1 |new_col_2 |
+---+----------+----------+
|foo|         a|         1|
|foo|         b|         2|
|foo|         c|         3|
+---+----------+----------+
Run Code Online (Sandbox Code Playgroud)

  • 我将添加我对使用“Dataframe.union”的通常评论:不要执行超过几千次。如果文件数量很大,执行图将变得大得不切实际。当前的 hacky 替代方案是在底层 RDD 上使用“SparkContext.union(RDD*)”。这会使 DAG 变平。 (3认同)