Spark如何使用带有Join的UDF

Jea*_*ean 9 join user-defined-functions dataframe apache-spark

我想使用特定UDF的使用Spark

这是计划:

我有一个table A(1000万行)和一个table B(1500万行)

我想使用UDF比较一个元素table A和一个table B 是否可能

这是我的代码示例.在某些时候我还需要说我的UDF比较必须大于0,9:

DataFrame dfr = df
                .select("name", "firstname", "adress1", "city1","compare(adress1,adress2)")
                .join(dfa,df.col("adress1").equalTo(dfa.col("adress2"))
                        .and((df.col("city1").equalTo(dfa.col("city2"))
                                ...;
Run Code Online (Sandbox Code Playgroud)

可能吗 ?

T. *_*ęda 5

是的你可以。但是,由于Spark无法进行谓词下推,因此它将比普通运算符要慢

例:

val udf = udf((x : String, y : String) => { here compute similarity; });
val df3 = df1.join(df2, udf(df1.field1, df2.field1) > 0.9)
Run Code Online (Sandbox Code Playgroud)

例如:

val df1 = Seq (1, 2, 3, 4).toDF("x")
val df2 = Seq(1, 3, 7, 11).toDF("q")
val udf = org.apache.spark.sql.functions.udf((x : Int, q : Int) => { Math.abs(x - q); });
val df3 = df1.join(df2, udf(df1("x"), df2("q")) > 1)
Run Code Online (Sandbox Code Playgroud)

您也可以直接从用户定义函数返回布尔值