迭代数据帧中的每一行,将其存储在 val 中并作为参数传递给 Spark SQL 查询

vva*_*zza 3 scala dataframe apache-spark apache-spark-sql

我试图从查找表(3 行和 3 列)中获取行并逐行迭代并将每行中的值作为参数传递给 SPARK SQL。

DB | TBL   | COL
----------------
db | txn   | ID

db | sales | ID

db | fee   | ID
Run Code Online (Sandbox Code Playgroud)

我在 spark shell 中尝试了一行,它奏效了。但我发现很难遍历行。

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val db_name:String = "db"

val tbl_name:String = "transaction"

val unique_col:String = "transaction_number"

val dupDf = sqlContext.sql(s"select count(*), transaction_number from $db_name.$tbl_name group by $unique_col having count(*)>1") 
Run Code Online (Sandbox Code Playgroud)

请让我知道如何遍历行并作为参数传递?

Ram*_*ram 5

以上 2 种方法一般来说可能是正确的......但是由于性能原因我不喜欢收集数据......特别是如果数据很大......

org.apache.spark.util.CollectionAccumulator 是这种要求的合适人选...请参阅文档

此外,如果数据很大,那么foreachPartition出于性能原因再次是正确的候选者!

下面是实现

package examples

import org.apache.log4j.Level
import org.apache.spark.sql.SparkSession
import org.apache.spark.util.CollectionAccumulator

import scala.collection.JavaConversions._
import scala.collection.mutable

object TableTest extends App {
  val logger = org.apache.log4j.Logger.getLogger("org")
  logger.setLevel(Level.WARN)


  val spark = SparkSession.builder.appName(getClass.getName)
    .master("local[*]").getOrCreate

  import spark.implicits._

 val lookup =
    Seq(("db", "txn", "ID"), ("db", "sales", "ID")
     , ("db", "fee", "ID")
    ).toDF("DB", "TBL", "COL")
  val collAcc: CollectionAccumulator[String] = spark.sparkContext.collectionAccumulator[String]("mySQL Accumulator")
  val data = lookup.foreachPartition { partition =>
    partition.foreach {
      {
        record => {
          val selectString = s"select count(*), transaction_number from ${record.getAs[String]("DB")}.${record.getAs[String]("TBL")} group by ${record.getAs[String]("COL")} having count(*)>1";
          collAcc.add(selectString)
          println(selectString)
        }
      }
    }
  }
  val mycollectionOfSelects: mutable.Seq[String] = asScalaBuffer(collAcc.value)
  val finaldf = mycollectionOfSelects.map { x => spark.sql(x)
  }.reduce(_ union _)
  finaldf.show

}

Run Code Online (Sandbox Code Playgroud)

示例结果:

[2019-08-13 12:11:16,458] WARN Unable to load native-hadoop library for your platform... using builtin-java classes where applicable (org.apache.hadoop.util.NativeCodeLoader:62)
[Stage 0:>                                                          (0 + 0) / 2]

select count(*), transaction_number from db.txn group by ID having count(*)>1

select count(*), transaction_number from db.sales group by ID having count(*)>1

select count(*), transaction_number from db.fee group by ID having count(*)>1


Run Code Online (Sandbox Code Playgroud)

注意 :由于这些是伪表,因此我没有显示数据框。