sji*_*han 3 json apache-spark pyspark databricks amazon-kinesis-firehose
我有一个将数据放入 S3 的 kinesis firehose 传输流。但是在数据文件中,json 对象之间没有分隔符。所以它看起来像这样,
{
"key1" : "value1",
"key2" : "value2"
}{
"key1" : "value1",
"key2" : "value2"
}
Run Code Online (Sandbox Code Playgroud)
在 Apache Spark 中,我这样做是为了读取数据文件,
df = spark.read.schema(schema).json(path, multiLine=True)
Run Code Online (Sandbox Code Playgroud)
这只能读取文件中的第一个 json 对象,其余的将被忽略,因为没有分隔符。
如何在 spark 中使用解决此问题?
您可以使用sparkContext的wholeTextFilesapi 将json 文件读入Tuple2(filename, whole text),将整个文本解析为 multiLine jsons,然后最终使用sqlContext将其作为json读取到 dataframe。
sqlContext\
.read\
.json(sc
.wholeTextFiles("path to your multiline json file")
.values()
.flatMap(lambda x: x
.replace("\n", "#!#")
.replace("{#!# ", "{")
.replace("#!#}", "}")
.replace(",#!#", ",")
.split("#!#")))\
.show()
Run Code Online (Sandbox Code Playgroud)
你应该得到dataframe的
+------+------+
| key1| key2|
+------+------+
|value1|value2|
|value1|value2|
+------+------+
Run Code Online (Sandbox Code Playgroud)
您可以根据需要修改代码
| 归档时间: |
|
| 查看次数: |
1631 次 |
| 最近记录: |