rod*_*ers 5 apache-spark apache-spark-2.0 spark-structured-streaming
我正在尝试透视Spark流数据集(结构化流),但是却得到了一个AnalysisException(以下摘录)。
有人可以确认结构化流(Spark 2.0)中确实不支持数据透视吗,也许建议其他方法?
线程“主”中的异常org.apache.spark.sql.AnalysisException:带流源的查询必须使用writeStream.start();执行;位于org.apache.spark.sql.catalystst的org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker $ .org $ apache $ spark $ sql $ catalyst $ analysis $ UnsupportedOperationChecker $$ throwError(UnsupportedOperationChecker.scala:297)上的kafka .analysis.UnsupportedOperationChecker $$ anonfun $ checkForBatch $ 1.apply(UnsupportedOperationChecker.scala:36)位于org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker $$ anonfun $ checkForBatch $ 1.apply(UnsupportedOorgationChecker。 .apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
tl;dr pivot聚合不被 Spark Structured Streaming 直接支持,直到并包括2.4.4。
作为解决方法,请使用DataStreamWriter.foreachBatch或更通用的DataStreamWriter.foreach。
我现在使用的是最新版本的 Spark 2.4.4。
scala> spark.version
res0: String = 2.4.4
Run Code Online (Sandbox Code Playgroud)
UnsupportedOperationChecker(您可以在堆栈跟踪中找到)检查流查询(的逻辑计划)是否仅使用受支持的操作。
当您执行时,pivot您必须groupBy首先执行,因为这是唯一提供给您的界面pivot。
有两个问题pivot:
pivot想知道要为多少列生成值,因此collect流数据集无法做到这一点。
pivot实际上groupBy是 Spark Structured Streaming 不支持的另一种聚合(旁边)
让我们看一下问题 1,其中没有要以定义为中心的列。
val sq = spark
.readStream
.format("rate")
.load
.groupBy("value")
.pivot("timestamp") // <-- pivot with no values
.count
.writeStream
.format("console")
scala> sq.start
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
rate
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.throwError(UnsupportedOperationChecker.scala:389)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForBatch$1(UnsupportedOperationChecker.scala:38)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForBatch$1$adapted(UnsupportedOperationChecker.scala:36)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:36)
at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:51)
at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:62)
at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:60)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3365)
at org.apache.spark.sql.Dataset.collect(Dataset.scala:2788)
at org.apache.spark.sql.RelationalGroupedDataset.pivot(RelationalGroupedDataset.scala:384)
... 49 elided
Run Code Online (Sandbox Code Playgroud)
最后两行显示的问题,即pivot 做 collect封面,因此下发行。
另一个问题是,即使您指定了要透视的列的值,由于多次聚合,您也会遇到另一个问题(并且您可以看到它实际上是对流的检查,而不是像第一个那样批量检查案件)。
val sq = spark
.readStream
.format("rate")
.load
.groupBy("value")
.pivot("timestamp", Seq(1)) // <-- pivot with explicit values
.count
.writeStream
.format("console")
scala> sq.start
org.apache.spark.sql.AnalysisException: Multiple streaming aggregations are not supported with streaming DataFrames/Datasets;;
Project [value#128L, __pivot_count(1) AS `count` AS `count(1) AS ``count```#141[0] AS 1#142L]
+- Aggregate [value#128L], [value#128L, pivotfirst(timestamp#127, count(1) AS `count`#137L, 1000000, 0, 0) AS __pivot_count(1) AS `count` AS `count(1) AS ``count```#141]
+- Aggregate [value#128L, timestamp#127], [value#128L, timestamp#127, count(1) AS count(1) AS `count`#137L]
+- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5dd63368,rate,List(),None,List(),None,Map(),None), rate, [timestamp#127, value#128L]
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.throwError(UnsupportedOperationChecker.scala:389)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:93)
at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:250)
at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:326)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:325)
... 49 elided
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
791 次 |
| 最近记录: |