vva*_*zza 3 scala dataframe apache-spark apache-spark-sql
我试图从查找表(3 行和 3 列)中获取行并逐行迭代并将每行中的值作为参数传递给 SPARK SQL。
DB | TBL | COL
----------------
db | txn | ID
db | sales | ID
db | fee | ID
Run Code Online (Sandbox Code Playgroud)
我在 spark shell 中尝试了一行,它奏效了。但我发现很难遍历行。
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val db_name:String = "db"
val tbl_name:String = "transaction"
val unique_col:String = "transaction_number"
val dupDf = sqlContext.sql(s"select count(*), transaction_number from $db_name.$tbl_name group by $unique_col having count(*)>1")
Run Code Online (Sandbox Code Playgroud)
请让我知道如何遍历行并作为参数传递?
以上 2 种方法一般来说可能是正确的......但是由于性能原因我不喜欢收集数据......特别是如果数据很大......
org.apache.spark.util.CollectionAccumulator
是这种要求的合适人选...请参阅文档
此外,如果数据很大,那么foreachPartition
出于性能原因再次是正确的候选者!
下面是实现
package examples
import org.apache.log4j.Level
import org.apache.spark.sql.SparkSession
import org.apache.spark.util.CollectionAccumulator
import scala.collection.JavaConversions._
import scala.collection.mutable
object TableTest extends App {
val logger = org.apache.log4j.Logger.getLogger("org")
logger.setLevel(Level.WARN)
val spark = SparkSession.builder.appName(getClass.getName)
.master("local[*]").getOrCreate
import spark.implicits._
val lookup =
Seq(("db", "txn", "ID"), ("db", "sales", "ID")
, ("db", "fee", "ID")
).toDF("DB", "TBL", "COL")
val collAcc: CollectionAccumulator[String] = spark.sparkContext.collectionAccumulator[String]("mySQL Accumulator")
val data = lookup.foreachPartition { partition =>
partition.foreach {
{
record => {
val selectString = s"select count(*), transaction_number from ${record.getAs[String]("DB")}.${record.getAs[String]("TBL")} group by ${record.getAs[String]("COL")} having count(*)>1";
collAcc.add(selectString)
println(selectString)
}
}
}
}
val mycollectionOfSelects: mutable.Seq[String] = asScalaBuffer(collAcc.value)
val finaldf = mycollectionOfSelects.map { x => spark.sql(x)
}.reduce(_ union _)
finaldf.show
}
Run Code Online (Sandbox Code Playgroud)
示例结果:
[2019-08-13 12:11:16,458] WARN Unable to load native-hadoop library for your platform... using builtin-java classes where applicable (org.apache.hadoop.util.NativeCodeLoader:62)
[Stage 0:> (0 + 0) / 2]
select count(*), transaction_number from db.txn group by ID having count(*)>1
select count(*), transaction_number from db.sales group by ID having count(*)>1
select count(*), transaction_number from db.fee group by ID having count(*)>1
Run Code Online (Sandbox Code Playgroud)
注意 :由于这些是伪表,因此我没有显示数据框。