Ave*_*ell 6 arrays struct nested apache-spark pyspark
===========更新========
我在 JSON(struct_c 和 array_d)中添加了更多详细信息,以便更清楚地了解异常的位置。
===========================
再会,
我有一个带有结构类型嵌套数组的 Spark DataFrame。我想从该结构中选择一列,但收到错误消息:“ org.apache.spark.sql.AnalysisException:由于数据类型不匹配而无法解析 ' home
. array_a
. ['a']':参数 2 需要整数类型,array_b
然而 ''a'' 是字符串类型“.
这是我的数据:
{
"home": {
"a_number": 5,
"a_string": "six",
"array_a": [
{
"array_b": [{"a": "1", "b": 2}],
"struct_c": {"a": 1.1, "b": 1.3},
"array_d": ["a", "b", "c"]
},
{
"array_b": [{"a": "3", "b": 4}],
"struct_c": {"a": 1.5, "b": 1.6},
"array_d": ["x", "y", "z"]
}
]
}
}
Run Code Online (Sandbox Code Playgroud)
这是我的数据架构:
mydf1 = spark.read.option("multiline", "true").json("myJson.json")
mydf1.printSchema()
root
|-- home: struct (nullable = true)
| |-- a_number: long (nullable = true)
| |-- a_string: string (nullable = true)
| |-- array_a: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- array_b: array (nullable = true)
| | | | |-- element: struct (containsNull = true)
| | | | | |-- a: string (nullable = true)
| | | | | |-- b: long (nullable = true)
| | | |-- array_d: array (nullable = true)
| | | | |-- element: string (containsNull = true)
| | | |-- struct_c: struct (nullable = true)
| | | | |-- a: double (nullable = true)
| | | | |-- b: double (nullable = true)
Run Code Online (Sandbox Code Playgroud)
当我从 array_a 内的 struct_c 或 array_d (字符串数组)选择数据时,没有问题。
mydf1.select("home.array_a.array_d").show(10, False)
+----------------------+
|array_d |
+----------------------+
|[[a, b, c], [x, y, z]]|
+----------------------+
mydf1.select(col("home.array_a.struct_c.a").alias("struct_field_inside_arrayA")).show(10, False)
+--------------------------+
|struct_field_inside_arrayA|
+--------------------------+
|[1.1, 1.5] |
+--------------------------+
Run Code Online (Sandbox Code Playgroud)
这是它失败的地方:
mydf1.select("home.array_a.array_b.a").printSchema()
mydf1.select("home.array_a.array_b.a").show()
Run Code Online (Sandbox Code Playgroud)
我期望的是一个二维字符串数组([["1", "3"]] 是我的示例 JSON)
您能帮忙解释一下为什么失败吗?
感谢您的帮助。
无法执行第 4 行: mydf1.select("home.array_a.array_b.a").printSchema() 回溯(最近一次调用最后):文件“/usr/lib/spark/python/lib/pyspark.zip/pyspark /sql/utils.py”,第 63 行,在装饰中返回 f(*a, **kw) 文件“/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol. py”,第 328 行,采用 get_return_value 格式(target_id,“.”,名称),值) py4j.protocol.Py4JJavaError:调用 o15300.select 时发生错误。:org.apache.spark.sql.AnalysisException:无法解析'
home
。array_a
。array_b
['a']' 由于数据类型不匹配:参数 2 需要整型,但是 ''a'' 是字符串类型。;; '项目 [home#18213.array_a.array_b[a] AS a#18217] +- 关系 [home#18213] json在 org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) 在 org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply $3.applyOrElse(CheckAnalysis.scala:115) 位于 org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$3.applyOrElse(CheckAnalysis.scala:107) 位于 org.apache。 Spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:278) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala :278)在org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)在org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:275) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode. scala:275) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:326) 在 org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode. scala:187)在org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:324)在org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:275)在 org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:93) 在 org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1。应用(QueryPlan.scala:93)在org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:105)在org.apache.spark.sql.catalyst.plans.QueryPlan$ $anonfun$1.apply(QueryPlan.scala:105) 在 org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) 在 org.apache.spark.sql.catalyst.plans.QueryPlan .transformExpression$1(QueryPlan.scala:104) 在 org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:116) org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$2.apply(QueryPlan.scala:121) 在 scala.collection.TraversableLike $$anonfun$map$1.apply(TraversableLike.scala:234) 在 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 在 scala.collection.mutable.ResizableArray$class.foreach(ResizingArray) .scala:59) 在 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 在 scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 在 scala.collection.AbstractTraversable.map(Traversable. scala:104) 在 org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:121) 在 org.apache.spark。 sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:126) 在 org.apache.spark.sql.catalyst.trees.TreeNode。mapProductIterator(TreeNode.scala:187) 在 org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:126) 在 org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan. scala:93) 在 org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:107) 在 org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun $checkAnalysis$1.apply(CheckAnalysis.scala:85) 在 org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127) 在 org.apache.spark.sql.catalyst.analysis.CheckAnalysis$ org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:95) 位于 org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun 处的 class.checkAnalysis(CheckAnalysis.scala:85) $executeAndCheck$1.apply(Analyzer.scala:108) 在 org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:105) 在 org.apache.spark.sql。 org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105) 位于 org.apache.spark.sql.execution 处的catalyst.plans.逻辑.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201) .QueryExecution.analyzed$lzycompute(QueryExecution.scala:57) 在 org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55) 在 org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution) .scala:47) 在 org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:79) 在 org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset .scala:3407)在org.apache.spark.sql.Dataset.select(Dataset.scala:1335)在sun.reflect.GenerateMethodAccessor348.invoke(未知来源)在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43 )在java.lang.reflect.Method.invoke(Method.java:498)在py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)在py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)在py4j .Gateway.invoke(Gateway.java:282) 在 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 在 py4j.commands.CallCommand.execute(CallCommand.java:79) 在 py4j.GatewayConnection.run(GatewayConnection. java:238)在java.lang.Thread.run(Thread.java:748)sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:95) 在 org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:108) 在 org.apache. Spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:105) 在 org.apache.spark.sql.catalyst.plans.逻辑.AnalysisHelper$.markInAnalyzer(Analyzer.scala:201)在 org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105) 在 org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57) 在 org.apache .spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55) 在 org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47) 在 org.apache.spark.sql.Dataset$。 ofRows(Dataset.scala:79) 在 org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:3407) 在 org.apache.spark.sql.Dataset.select (Dataset.scala:1335)在sun.reflect.GenerateMethodAccessor348.invoke(未知来源)在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)在java.lang.reflect.Method.invoke(Method.java:498) )在 py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) 在 py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 在 py4j.Gateway.invoke(Gateway.java:282) 在 py4j.commands.AbstractCommand .invokeMethod(AbstractCommand.java:132) 在 py4j.commands.CallCommand.execute(CallCommand.java:79) 在 py4j.GatewayConnection.run(GatewayConnection.java:238) 在 java.lang.Thread.run(Thread.java:第748章)sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:95) 在 org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:108) 在 org.apache. Spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:105) 在 org.apache.spark.sql.catalyst.plans.逻辑.AnalysisHelper$.markInAnalyzer(Analyzer.scala:201)在 org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105) 在 org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57) 在 org.apache .spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55) 在 org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47) 在 org.apache.spark.sql.Dataset$。 ofRows(Dataset.scala:79) 在 org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:3407) 在 org.apache.spark.sql.Dataset.select (Dataset.scala:1335)在sun.reflect.GenerateMethodAccessor348.invoke(未知来源)在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)在java.lang.reflect.Method.invoke(Method.java:498) )在 py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) 在 py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 在 py4j.Gateway.invoke(Gateway.java:282) 在 py4j.commands.AbstractCommand .invokeMethod(AbstractCommand.java:132) 在 py4j.commands.CallCommand.execute(CallCommand.java:79) 在 py4j.GatewayConnection.run(GatewayConnection.java:238) 在 java.lang.Thread.run(Thread.java:第748章)357) 在 py4j.Gateway.invoke(Gateway.java:282) 在 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 在 py4j.commands.CallCommand.execute(CallCommand.java:79) 在 py4j.GatewayConnection。运行(GatewayConnection.java:238)在java.lang.Thread.run(Thread.java:748)357) 在 py4j.Gateway.invoke(Gateway.java:282) 在 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 在 py4j.commands.CallCommand.execute(CallCommand.java:79) 在 py4j.GatewayConnection。运行(GatewayConnection.java:238)在java.lang.Thread.run(Thread.java:748)
在处理上述异常的过程中,又出现了一个异常:
回溯(最近一次调用最后):文件“/tmp/zeppelin_pyspark-5197917387349583174.py”,第380行,在exec(code,_zcUserQueryNameSpace)文件“”,第4行,在文件“/usr/lib/spark/python/lib”中/pyspark.zip/pyspark/sql/dataframe.py”,第 1320 行,在 select jdf = self._jdf.select(self._jcols(*cols)) 文件“/usr/lib/spark/python/lib/py4j- 0.10.7-src.zip/py4j/java_gateway.py”,第 1257 行,呼叫 应答中,self.gateway_client,self.target_id,self.name)文件“/usr/lib/spark/python/lib/pyspark.zip /pyspark/sql/utils.py",第 69 行,在装饰中引发 AnalysisException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.AnalysisException: "无法解析 '
home
.array_a
.array_b
[' a']' 由于数据类型不匹配:参数 2 需要整数类型,但是 ''a'' 是字符串类型。;;\n'Project [home#18213.array_a.array_b[a] AS a#18217] \n+- 关系[home#18213] json\n"
由于您对element_at()函数没有问题,我假设您使用的是spark 2.4+,那么您可以尝试Spark SQL内置函数:transform [1] [2] + flatten:
>>> mydf1.selectExpr('flatten(transform(home.array_a.array_b, x -> x.a)) as array_field_inside_array').show()
+------------------------+
|array_field_inside_array|
+------------------------+
| [1, 3]|
+------------------------+
Run Code Online (Sandbox Code Playgroud)
我们使用transform()a
函数仅检索每个数组元素的字段值home.array_a.array_b
并将它们转换为数组[[1], [3]]
。然后将该数组展平为[1, 3]
. 如果您需要结果为[[1, 3]]
,则只需添加 array() 函数
array(flatten(transform(home.array_a.array_b, x -> x.a)))
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
29861 次 |
最近记录: |