pyspark:将schemaRDD保存为json文件

dp0*_*377 6 python json apache-spark

我正在寻找一种方法将数据从Apache Spark导出到JSON格式的各种其他工具.我认为必须有一种非常简单的方法来做到这一点.

示例:我有以下JSON文件'jfile.json':

{"key":value_a1, "key2":value_b1},
{"key":value_a2, "key2":value_b2},
{...}
Run Code Online (Sandbox Code Playgroud)

其中文件的每一行都是JSON对象.这些文件可以很容易地读入PySpark

jsonRDD = jsonFile('jfile.json')
Run Code Online (Sandbox Code Playgroud)

然后看起来像(通过调用jsonRDD.collect()):

[Row(key=value_a1, key2=value_b1),Row(key=value_a2, key2=value_b2)]
Run Code Online (Sandbox Code Playgroud)

现在我想将这些类型的文件保存回纯JSON文件.

我在Spark用户列表中找到了这个条目:

http://apache-spark-user-list.1001560.n3.nabble.com/Updating-exising-JSON-files-td12211.html

宣称使用

RDD.saveAsTextFile(jsonRDD) 
Run Code Online (Sandbox Code Playgroud)

执行此操作后,文本文件看起来像

Row(key=value_a1, key2=value_b1)
Row(key=value_a2, key2=value_b2)
Run Code Online (Sandbox Code Playgroud)

,即jsonRDD刚刚写入文件.在阅读Spark用户列表条目后,我本来期望一种"自动"转换回JSON格式.我的目标是在开头提到一个看起来像'jfile.json'的文件.

我错过了一个非常明显的简单方法吗?

我阅读了http://spark.apache.org/docs/latest/programming-guide.html,搜索了谷歌,用户列表和堆栈溢出的答案,但几乎所有答案都涉及阅读和解析JSON到Spark.我甚至买了"学习星火"这本书,但那里的例子(第71页)只是导致了与上面相同的输出文件.

有人可以帮帮我吗?我觉得我在这里只缺少一个小链接

欢呼并提前致谢!

jeg*_*don 5

您可以使用方法toJson(),它允许您将SchemaRDD转换为JSON文档的MappedRDD.

https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=tojson#pyspark.sql.SchemaRDD.toJSON


Spi*_*lov 1

我看不出有什么简单的方法可以做到这一点。一种解决方案是将 的每个元素转换SchemaRDD为 a String,最后得到一个RDD[String],其中每个元素都是该行的 JSON 格式。因此,您需要编写自己的 JSON 序列化器。这是最简单的部分。它可能不是超级快,但它应该并行工作,并且您已经知道如何将其保存RDD到文本文件。

SchemaRDD关键的见解是您可以通过调用该方法来获取模式的表示schema。那么Rowmap交给你的每一个都需要结合schema进行递归遍历。这实际上是平面 JSON 的串联列表遍历,但您可能还需要考虑嵌套 JSON。

剩下的只是 Python 的一个小问题,我不讲,但我确实在 Scala 中做了这个工作,以防它对你有帮助。Scala 代码变得密集的部分实际上并不依赖于深厚的 Spark 知识,因此如果您能够理解基本的递归并了解 Python,您应该能够使其工作。您的大部分工作是弄清楚如何在 Python API 中使用 apyspark.sql.Row和 a 。pyspark.sql.StructType

需要注意的是:我很确定我的代码在缺少值的情况下还无法工作——该formatItem方法需要处理 null 元素。

编辑:Spark 1.2.0中,该toJSON方法被引入到SchemaRDD,使这个问题变得更加简单 - 请参阅@jegordon 的答案。