如何将数据帧作为 json 数组写入文件?(斯卡拉)

Ace*_*Ace 5 json scala dataframe apache-spark

我有一个数据框,我想将其作为 json 数组写入 scala 中的单个文件中。

尝试1:

dataframe.coalesce(1).write.format("json").save(destDir)
Run Code Online (Sandbox Code Playgroud)

输出1: 每行一行,其中每一行都是一个json

尝试2:

dataframe.toJSON.coalesce(1).write.format("json").save(destDir)
Run Code Online (Sandbox Code Playgroud)

输出 2: 与输出 1 相同,但每行都有一个看起来很奇怪的 json {value: {key1:value1, key2:value2, ... }

尝试 3(使用 java PrintWriter 写入字符串):

printWriter.write(dataframe.toJSON.collect.mkString("[",",","]"))
Run Code Online (Sandbox Code Playgroud)

输出3:

它将 json 数组写入本地路径。如果路径是 hdfs 的,即使路径 + 文件存在,它也会显示 FileNotFound。

Vin*_*oba 4

要将数据帧写入 json 数组,首先将数据帧转换为 json 字符串,然后转换这些字符串,以便每一行都是未来 json 文件中的一行,然后使用而text不是json

分析

要将数据帧写入 json,您可以从.toJSON尝试 2 和 3 中的方法开始:

val rawJson = dataframe.toJSON
Run Code Online (Sandbox Code Playgroud)

现在您有一个数据框,value其中一列包含行的 json 表示形式为String.

要将此数据框转换为每行代表未来文件的一行的数据框,您需要:

  • [添加包含数据框第一行的新行
  • 向表示 json 数据的所有行添加逗号
  • 除了最后一行 json 数据
  • ]添加包含数据框最后一行的新行

如您所见,“第一个”和“最后一个”等概念在您的情况下很重要,因此您需要在数据框中建立行的排序。你可以这样关联它:

+-------+--------------------+------------+
| order | row                | value      |
+-------+--------------------+------------+
| 0     | first row          | "["        |
| 1     | row with json      | "  {...}," | 
| 1     | row with json      | "  {...}," | 
| ...   | ...                | ...        |
| 1     | row with json      | "  {...}," |
| 1     | row with json      | "  {...}," |
| 2     | last row with json | "  {...}"  |
| 3     | last row           | "]"        |
+-------+--------------------+------------+
Run Code Online (Sandbox Code Playgroud)

首先,您可以将最后一行 json 与其他行区分开来。为此,您可以使用窗口函数。您计算窗口中包含当前行和下一行的行数,这意味着您将每一行与 2 关联,但最后一行没有下一行,因此与 1 关联。

+-------+--------------------+------------+
| order | row                | value      |
+-------+--------------------+------------+
| 0     | first row          | "["        |
| 1     | row with json      | "  {...}," | 
| 1     | row with json      | "  {...}," | 
| ...   | ...                | ...        |
| 1     | row with json      | "  {...}," |
| 1     | row with json      | "  {...}," |
| 2     | last row with json | "  {...}"  |
| 3     | last row           | "]"        |
+-------+--------------------+------------+
Run Code Online (Sandbox Code Playgroud)

但是,您希望最后一行在“order”列中具有 2,而其他行在“order”列中具有 1。您可以使用 modulo ( %) 函数来实现此目的:

val window = Window.rowsBetween(Window.currentRow, 1)

val jsonWindow = rawJson.withColumn("order", count("value").over(window))
Run Code Online (Sandbox Code Playgroud)

然后,向除最后一行之外的所有行添加逗号,这意味着向“order”列设置为 1 的所有行添加逗号:

val jsonRowsWithOrder = jsonWindow.withColumn("order", (col("order") % lit(2)) + 1)
Run Code Online (Sandbox Code Playgroud)

最终文件中的这些行将缩进,因此您缩进它们:

val jsonRowsWithCommas = jsonRowsWithOrder.withColumn("value", when(col("order").equalTo(1), concat(col("value"), lit(","))).otherwise(col("value")))
Run Code Online (Sandbox Code Playgroud)

您添加第一行和最后一行,其中包含左方括号和右方括号:

val indentedJsonRows = jsonRowsWithCommas.withColumn("value", concat(lit("  "), col("value")))
Run Code Online (Sandbox Code Playgroud)

您订购它:

val unorderedRows = indentedJsonRows.unionByName(Seq((0, "["), (3, "]")).toDF("order", "value"))
Run Code Online (Sandbox Code Playgroud)

您合并后只有一个分区,因为您最后只需要一个文件:

val orderedRows = unorderedRows.orderBy("order").drop("order")
Run Code Online (Sandbox Code Playgroud)

然后你把它写成文本

partitionedRows = orderedRows.coalesce(1)
Run Code Online (Sandbox Code Playgroud)

你就完成了!

完整的解决方案

这是完整的解决方案,带有导入。该解决方案适用于 Spark 2.3(使用 Spark 3.0 进行测试):

partitionedRows.write.text(destDir)
Run Code Online (Sandbox Code Playgroud)

结论

您可以仅使用 Spark 将 Spark 数据帧编写为 json 数组。

然而,spark 是一种并行计算框架,因此强制执行顺序并缩小到一个分区并不是它应该的工作方式。此外,由于你无法更改spark输出的文件名,因此保存的文件将具有.txt扩展名(但里面是一个json数组)

最好保存数据帧,.write.json(destDir)然后使用经典工具重新处理输出,而不是创建复杂的逻辑来使用 Spark 进行输出。