ren*_*key 3 scala apache-spark
我是scala和spark的新手,我一直在努力寻找这个问题的解决方案 - 它正在努力.我已经尝试了以下代码的20种不同变体并type mismatch
在我尝试时遇到错误对列执行计算.
我有一个spark数据帧,我希望检查特定列中的每个字符串是否包含来自预定义List
(或Set
)单词的任意数量的单词.
以下是一些用于复制的示例数据:
// sample data frame
val df = Seq(
(1, "foo"),
(2, "barrio"),
(3, "gitten"),
(4, "baa")).toDF("id", "words")
// dictionary Set of words to check
val dict = Set("foo","bar","baaad")
Run Code Online (Sandbox Code Playgroud)
现在,我正在尝试使用比较结果创建第三列,以查看其中$"words"
列中的字符串是否包含单词集中的任何dict
单词.所以结果应该是:
+---+-----------+-------------+
| id| words| word_check|
+---+-----------+-------------+
| 1| foo| true|
| 2| bario| true|
| 3| gitten| false|
| 4| baa| false|
+---+-----------+-------------+
Run Code Online (Sandbox Code Playgroud)
首先,我试图看看我是否可以在不使用UDF的情况下本地执行,因为dict Set实际上是一个> 40K字的大型字典,据我所知,这将比UDF更有效:
df.withColumn("word_check", dict.exists(d => $"words".contains(d)))
Run Code Online (Sandbox Code Playgroud)
但我得到错误:
type mismatch;
found : org.apache.spark.sql.Column
required: Boolean
Run Code Online (Sandbox Code Playgroud)
我还尝试创建一个UDF来执行此操作(使用两者mutable.Set
并mutable.WrappedArray
描述Set - 不确定哪个是正确的但不起作用):
val checker: ((String, scala.collection.mutable.Set[String]) => Boolean) = (col: String, array: scala.collection.mutable.Set[String] ) => array.exists(d => col.contains(d))
val udf1 = udf(checker)
df.withColumn("word_check", udf1($"words", dict )).show()
Run Code Online (Sandbox Code Playgroud)
但得到另一种类型不匹配:
found : scala.collection.immutable.Set[String]
required: org.apache.spark.sql.Column
Run Code Online (Sandbox Code Playgroud)
如果集合是固定数字,我应该能够Lit(Int)
在表达式中使用?但我真的不明白通过在scala中混合使用不同的数据类型来在列上执行更复杂的函数.
任何帮助都非常感激,特别是如果它可以有效地完成(它是一个大于5米行的df).
以下是使用UDF执行此操作的方法:
val checkerUdf = udf { (s: String) => dict.exists(s.contains(_)) }
df.withColumn("word_check", checkerUdf($"words")).show()
Run Code Online (Sandbox Code Playgroud)
您实现中的错误是您创建了一个期望两个参数的UDF,这意味着您Column
在应用它时必须传递两个- 但dict
不是Column
在您的DataFrame中,而是本地可变.
无论效率如何,这似乎都有效:
df.withColumn("word_check", dict.foldLeft(lit(false))((a, b) => a || locate(b, $"words") > 0)).show
+---+------+----------+
| id| words|word_check|
+---+------+----------+
| 1| foo| true|
| 2|barrio| true|
| 3|gitten| false|
| 4| baa| false|
+---+------+----------+
Run Code Online (Sandbox Code Playgroud)
如果你的字典很大,你不应该只在你的 udf 中引用它,因为每个任务的整个字典都会通过网络发送。我会结合 udf 广播你的 dict:
import org.apache.spark.broadcast.Broadcast
def udf_check(words: Broadcast[scala.collection.immutable.Set[String]]) = {
udf {(s: String) => words.value.exists(s.contains(_))}
}
df.withColumn("word_check", udf_check(sparkContext.broadcast(dict))($"words"))
Run Code Online (Sandbox Code Playgroud)
或者,您也可以使用联接:
val dict_df = dict.toList.toDF("word")
df
.join(broadcast(dict_df),$"words".contains($"word"),"left")
.withColumn("word_check",$"word".isNotNull)
.drop($"word")
Run Code Online (Sandbox Code Playgroud)