Ace*_*Ace 5 json scala dataframe apache-spark
我有一个数据框,我想将其作为 json 数组写入 scala 中的单个文件中。
dataframe.coalesce(1).write.format("json").save(destDir)
Run Code Online (Sandbox Code Playgroud)
输出1: 每行一行,其中每一行都是一个json
dataframe.toJSON.coalesce(1).write.format("json").save(destDir)
Run Code Online (Sandbox Code Playgroud)
输出 2: 与输出 1 相同,但每行都有一个看起来很奇怪的 json {value: {key1:value1, key2:value2, ... }
printWriter.write(dataframe.toJSON.collect.mkString("[",",","]"))
Run Code Online (Sandbox Code Playgroud)
输出3:
它将 json 数组写入本地路径。如果路径是 hdfs 的,即使路径 + 文件存在,它也会显示 FileNotFound。
要将数据帧写入 json 数组,首先将数据帧转换为 json 字符串,然后转换这些字符串,以便每一行都是未来 json 文件中的一行,然后使用而text不是json
要将数据帧写入 json,您可以从.toJSON尝试 2 和 3 中的方法开始:
val rawJson = dataframe.toJSON
Run Code Online (Sandbox Code Playgroud)
现在您有一个数据框,value其中一列包含行的 json 表示形式为String.
要将此数据框转换为每行代表未来文件的一行的数据框,您需要:
[添加包含数据框第一行的新行]添加包含数据框最后一行的新行如您所见,“第一个”和“最后一个”等概念在您的情况下很重要,因此您需要在数据框中建立行的排序。你可以这样关联它:
+-------+--------------------+------------+
| order | row | value |
+-------+--------------------+------------+
| 0 | first row | "[" |
| 1 | row with json | " {...}," |
| 1 | row with json | " {...}," |
| ... | ... | ... |
| 1 | row with json | " {...}," |
| 1 | row with json | " {...}," |
| 2 | last row with json | " {...}" |
| 3 | last row | "]" |
+-------+--------------------+------------+
Run Code Online (Sandbox Code Playgroud)
首先,您可以将最后一行 json 与其他行区分开来。为此,您可以使用窗口函数。您计算窗口中包含当前行和下一行的行数,这意味着您将每一行与 2 关联,但最后一行没有下一行,因此与 1 关联。
+-------+--------------------+------------+
| order | row | value |
+-------+--------------------+------------+
| 0 | first row | "[" |
| 1 | row with json | " {...}," |
| 1 | row with json | " {...}," |
| ... | ... | ... |
| 1 | row with json | " {...}," |
| 1 | row with json | " {...}," |
| 2 | last row with json | " {...}" |
| 3 | last row | "]" |
+-------+--------------------+------------+
Run Code Online (Sandbox Code Playgroud)
但是,您希望最后一行在“order”列中具有 2,而其他行在“order”列中具有 1。您可以使用 modulo ( %) 函数来实现此目的:
val window = Window.rowsBetween(Window.currentRow, 1)
val jsonWindow = rawJson.withColumn("order", count("value").over(window))
Run Code Online (Sandbox Code Playgroud)
然后,向除最后一行之外的所有行添加逗号,这意味着向“order”列设置为 1 的所有行添加逗号:
val jsonRowsWithOrder = jsonWindow.withColumn("order", (col("order") % lit(2)) + 1)
Run Code Online (Sandbox Code Playgroud)
最终文件中的这些行将缩进,因此您缩进它们:
val jsonRowsWithCommas = jsonRowsWithOrder.withColumn("value", when(col("order").equalTo(1), concat(col("value"), lit(","))).otherwise(col("value")))
Run Code Online (Sandbox Code Playgroud)
您添加第一行和最后一行,其中包含左方括号和右方括号:
val indentedJsonRows = jsonRowsWithCommas.withColumn("value", concat(lit(" "), col("value")))
Run Code Online (Sandbox Code Playgroud)
您订购它:
val unorderedRows = indentedJsonRows.unionByName(Seq((0, "["), (3, "]")).toDF("order", "value"))
Run Code Online (Sandbox Code Playgroud)
您合并后只有一个分区,因为您最后只需要一个文件:
val orderedRows = unorderedRows.orderBy("order").drop("order")
Run Code Online (Sandbox Code Playgroud)
然后你把它写成文本:
partitionedRows = orderedRows.coalesce(1)
Run Code Online (Sandbox Code Playgroud)
你就完成了!
这是完整的解决方案,带有导入。该解决方案适用于 Spark 2.3(使用 Spark 3.0 进行测试):
partitionedRows.write.text(destDir)
Run Code Online (Sandbox Code Playgroud)
您可以仅使用 Spark 将 Spark 数据帧编写为 json 数组。
然而,spark 是一种并行计算框架,因此强制执行顺序并缩小到一个分区并不是它应该的工作方式。此外,由于你无法更改spark输出的文件名,因此保存的文件将具有.txt扩展名(但里面是一个json数组)
最好保存数据帧,.write.json(destDir)然后使用经典工具重新处理输出,而不是创建复杂的逻辑来使用 Spark 进行输出。