为什么Spark失败了"检测到的逻辑计划之间的INNER连接的笛卡尔积"?

Mar*_*ace 22 scala apache-spark apache-spark-sql

我正在使用Spark 2.1.0.

当我执行以下代码时,我从Spark收到错误.为什么?怎么解决?

val i1 = Seq(("a", "string"), ("another", "string"), ("last", "one")).toDF("a", "b")
val i2 = Seq(("one", "string"), ("two", "strings")).toDF("a", "b")
val i1Idx = i1.withColumn("sourceId", lit(1))
val i2Idx = i2.withColumn("sourceId", lit(2))
val input = i1Idx.union(i2Idx)
val weights = Seq((1, 0.6), (2, 0.4)).toDF("sourceId", "weight")
weights.join(input, "sourceId").show
Run Code Online (Sandbox Code Playgroud)

错误:

scala> weights.join(input, "sourceId").show
org.apache.spark.sql.AnalysisException: Detected cartesian product for INNER join between logical plans
Project [_1#34 AS sourceId#39, _2#35 AS weight#40]
+- Filter (((1 <=> _1#34) || (2 <=> _1#34)) && (_1#34 = 1))
   +- LocalRelation [_1#34, _2#35]
and
Union
:- Project [_1#0 AS a#5, _2#1 AS b#6]
:  +- LocalRelation [_1#0, _2#1]
+- Project [_1#10 AS a#15, _2#11 AS b#16]
   +- LocalRelation [_1#10, _2#11]
Join condition is missing or trivial.
Use the CROSS JOIN syntax to allow cartesian products between these relations.;
  at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$19.applyOrElse(Optimizer.scala:1011)
  at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$19.applyOrElse(Optimizer.scala:1008)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:287)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:331)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:329)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:293)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:331)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:329)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:293)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:277)
  at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts.apply(Optimizer.scala:1008)
  at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts.apply(Optimizer.scala:993)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
  at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
  at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
  at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:35)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
  at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:73)
  at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:73)
  at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:79)
  at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:84)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:84)
  at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2791)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2112)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2327)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:248)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:636)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:595)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:604)
  ... 48 elided
Run Code Online (Sandbox Code Playgroud)

Sha*_*ala 30

您可以在打开标志后触发内部联接

spark.conf.set("spark.sql.crossJoin.enabled", "true")
Run Code Online (Sandbox Code Playgroud)

您还可以使用交叉连接.

weights.crossJoin(input)
Run Code Online (Sandbox Code Playgroud)

或将别名设置为

weights.join(input, input("sourceId")===weights("sourceId"), "cross")
Run Code Online (Sandbox Code Playgroud)

您可以在2.1.1中找到有关SPARK-6459的更多信息

正如您已经使用过2.1.1,问题应该已经解决.

希望这可以帮助!

  • 我不需要交叉连接。我需要一个内部连接。 (2认同)

Jac*_*ski 5

tl; dr升级到Spark 2.1.1.这是Spark修复的一个问题.

(我真的希望我也能告诉你在2.1.1中确定的确切变化)

  • 你知道问题是如何解决的吗?我有火花2.2,我仍然得到这个问题,火花误解常规加入作为笛卡尔积. (13认同)
  • 我还在Spark 2.2上仍然遇到这个问题,它实际上没有笛卡尔乘积(或者应该简单地推断出一个联接)。除非有记录在案的修复程序,否则更新不是一个好的解决方案。 (3认同)