Pra*_*san 1 scala user-defined-functions apache-spark apache-spark-sql
我正在使用Spark 1.6,我想知道如何在数据帧中实现查找.
我有两个数据框架员工和部门.
员工数据框架
-------------------
Emp Id | Emp Name
------------------
1 | john
2 | David
Run Code Online (Sandbox Code Playgroud)部门数据框
--------------------
Dept Id | Dept Name | Emp Id
-----------------------------
1 | Admin | 1
2 | HR | 2
Run Code Online (Sandbox Code Playgroud)我想从employee表中查找emp id到department表并获取dept名称.所以,结果集将是
Emp Id | Dept Name
-------------------
1 | Admin
2 | HR
Run Code Online (Sandbox Code Playgroud)
如何在SPARK中实现此查找UDF功能.我不想在两个数据帧上使用JOIN.
正如评论中已经提到的那样,加入数据帧是最佳选择.
您可以使用查找,但我认为没有"分布式"解决方案,即您必须将查找表收集到驱动程序内存中.另请注意,此方法假定EmpID是唯一的:
import org.apache.spark.sql.functions._
import sqlContext.implicits._
import scala.collection.Map
val emp = Seq((1,"John"),(2,"David"))
val deps = Seq((1,"Admin",1),(2,"HR",2))
val empRdd = sc.parallelize(emp)
val depsDF = sc.parallelize(deps).toDF("DepID","Name","EmpID")
val lookupMap = empRdd.collectAsMap()
def lookup(lookupMap:Map[Int,String]) = udf((empID:Int) => lookupMap.get(empID))
val combinedDF = depsDF
.withColumn("empNames",lookup(lookupMap)($"EmpID"))
Run Code Online (Sandbox Code Playgroud)
我最初的想法是传递empRdd给UDF并使用lookup定义的方法PairRDD,但这当然不起作用,因为你不能lookup在转换(即UDF)中有spark动作(即).
编辑:
如果你的empDf有多个列(例如Name,Age),你可以使用它
val empRdd = empDf.rdd.map{row =>
(row.getInt(0),(row.getString(1),row.getInt(2)))}
val lookupMap = empRdd.collectAsMap()
def lookup(lookupMap:Map[Int,(String,Int)]) =
udf((empID:Int) => lookupMap.lift(empID))
depsDF
.withColumn("lookup",lookup(lookupMap)($"EmpID"))
.withColumn("empName",$"lookup._1")
.withColumn("empAge",$"lookup._2")
.drop($"lookup")
.show()
Run Code Online (Sandbox Code Playgroud)