每个循环嵌套两个 DataFrame

use*_*922 2 scala apache-spark apache-spark-sql

foreachDataFrams的Loop 嵌套迭代会抛出 NullPointerException:

def nestedDataFrame(leftDF: DataFrame, riteDF: DataFrame): Unit = {    
    val leftCols: Array[String] = leftDF.columns
    val riteCols: Array[String] = riteDF.columns

    leftCols.foreach { ltColName =>
        leftDF.select(ltColName).foreach { ltRow =>
            val leftString = ltRow.apply(0).toString();
            // Works ... But Same Kind Of Code Below
            riteCols.foreach { rtColName =>
              riteDF.select(rtColName).foreach { rtRow => //Exception
              val riteString = rtRow.apply(0).toString();
              print(leftString.equals(riteString)
            }
        }
    }

  }
Run Code Online (Sandbox Code Playgroud)

例外:

java.lang.NullPointerException at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:77) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset) .scala:3406) 在 org.apache.spark.sql.Dataset.select(Dataset.scala:1334) 在 org.apache.spark.sql.Dataset.select(Dataset.scala:1352)

可能出了什么问题以及如何解决?

小智 5

leftDF.select(ltColName).foreach { ltRow =>
Run Code Online (Sandbox Code Playgroud)

上面的行将您的代码作为执行程序的任务放入 foreach 块中。现在riteDF.select(rtColName).foreach { rtRow =>,您正在尝试访问不允许的执行程序中的 Spark 会话。Spark 会话仅在驱动程序端可用。在该ofRow方法中,它尝试访问sparkSession

val qe = sparkSession.sessionState.executePlan(logicalPlan)
Run Code Online (Sandbox Code Playgroud)

您不能像使用常规 Java/Scala 集合一样使用数据集集合,而应该通过可用于完成任务的 api 使用它们,例如您可以加入它们以关联日期。


在这种情况下,您可以通过多种方式完成比较。您可以加入 2 个数据集,例如,

var joinedDf = leftDF.select(ltColName).join(riteDF.select(rtColName), $"ltColName" === $"rtColName", "inner")
Run Code Online (Sandbox Code Playgroud)

然后分析joinedDf. 您甚至可以intersect()使用两个数据集。