mor*_*ayr 5 union scala apache-spark spark-dataframe
我有以下两个JSON文件:
{
"name" : "Agent1",
"age" : "32",
"details" : [{
"d1" : 1,
"d2" : 2
}
]
}
{
"name" : "Agent2",
"age" : "42",
"details" : []
}
Run Code Online (Sandbox Code Playgroud)
我用火花读了它们:
val jsonDf1 = spark.read.json(pathToJson1)
val jsonDf2 = spark.read.json(pathToJson2)
Run Code Online (Sandbox Code Playgroud)
使用以下模式创建两个数据帧:
root
|-- age: string (nullable = true)
|-- details: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- d1: long (nullable = true)
| | |-- d2: long (nullable = true)
|-- name: string (nullable = true)
root
|-- age: string (nullable = true)
|-- details: array (nullable = true)
| |-- element: string (containsNull = true)
|-- name: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
当我尝试使用这两个数据帧执行联合时,我收到此错误:
jsonDf1.union(jsonDf2)
org.apache.spark.sql.AnalysisException: unresolved operator 'Union;;
'Union
:- LogicalRDD [age#0, details#1, name#2]
+- LogicalRDD [age#7, details#8, name#9]
Run Code Online (Sandbox Code Playgroud)
我该如何解决这个问题?我将在JSON文件中有时会在加载spark作业时获得空数组,但它仍然需要统一它们,这应该不是问题,因为Json文件的模式是相同的.
如果您尝试合并 2 个数据帧,您将得到:
error:org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the compatible column types. ArrayType(StringType,true) <> ArrayType(StructType(StructField(d1,StringType,true), StructField(d2,StringType,true)),true) at the second column of the second table
为了解决这个问题,如果你可以同时读取 JSON,我会建议:
val jsonDf1 = spark.read.json("json1.json", "json2.json")
这将给出这个架构:
jsonDf1.printSchema
|-- age: string (nullable = true)
|-- details: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- d1: long (nullable = true)
| | |-- d2: long (nullable = true)
|-- name: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
数据输出
jsonDf1.show(10,truncate = false)
+---+-------+------+
|age|details|name |
+---+-------+------+
|32 |[[1,2]]|Agent1|
|42 |null |Agent2|
+---+-------+------+
Run Code Online (Sandbox Code Playgroud)
如果您的 json 在不同时间到达,作为默认解决方案,我建议您读取具有完整数组的模板 JSON 对象,这将使您的数据框具有可能对任何联合都有效的空数组。然后,在输出结果之前,您将使用过滤器删除此假 JSON:
val df = spark.read.json("jsonWithMaybeAnEmptyArray.json",
"TemplateFakeJsonWithAFullArray.json")
df.filter($"name" !== "FakeAgent").show(1)
Run Code Online (Sandbox Code Playgroud)
请注意:已开放 Jira 卡以提高合并 SQL 数据类型的能力:https : //issues.apache.org/jira/browse/SPARK-19536,这种操作应该可以在下一个 Spark 版本中实现。
polomarcus 的回答让我找到了这个解决方案:我无法一次读取所有文件,因为我得到了一个文件列表作为输入,而 Spark 没有接收路径列表的 API,但显然使用 Scala 可以做这个:
val files = List("path1", "path2", "path3")
val dataframe = spark.read.json(files: _*)
Run Code Online (Sandbox Code Playgroud)
这样我就得到了一个包含所有三个文件的数据框。
| 归档时间: |
|
| 查看次数: |
1209 次 |
| 最近记录: |