Spark(scala)数据帧 - 检查列中的字符串是否包含集合中的任何项目

ren*_*key 3 scala apache-spark

我是scala和spark的新手,我一直在努力寻找这个问题的解决方案 - 它正在努力.我已经尝试了以下代码的20种不同变体并type mismatch在我尝试时遇到错误对列执行计算.

我有一个spark数据帧,我希望检查特定列中的每个字符串是否包含来自预定义List(或Set)单词的任意数量的单词.

以下是一些用于复制的示例数据:

// sample data frame
val df = Seq(
      (1, "foo"),
      (2, "barrio"),
      (3, "gitten"),
      (4, "baa")).toDF("id", "words")

// dictionary Set of words to check 
val dict = Set("foo","bar","baaad")
Run Code Online (Sandbox Code Playgroud)

现在,我正在尝试使用比较结果创建第三列,以查看其中$"words"列中的字符串是否包含单词集中的任何dict单词.所以结果应该是:

+---+-----------+-------------+
| id|      words|   word_check| 
+---+-----------+-------------+
|  1|        foo|         true|     
|  2|      bario|         true|
|  3|     gitten|        false|
|  4|        baa|        false|
+---+-----------+-------------+
Run Code Online (Sandbox Code Playgroud)

首先,我试图看看我是否可以在不使用UDF的情况下本地执行,因为dict Set实际上是一个> 40K字的大型字典,据我所知,这将比UDF更有效:

df.withColumn("word_check", dict.exists(d => $"words".contains(d)))
Run Code Online (Sandbox Code Playgroud)

但我得到错误:

type mismatch;
found   : org.apache.spark.sql.Column
required: Boolean
Run Code Online (Sandbox Code Playgroud)

我还尝试创建一个UDF来执行此操作(使用两者mutable.Setmutable.WrappedArray描述Set - 不确定哪个是正确的但不起作用):

val checker: ((String, scala.collection.mutable.Set[String]) => Boolean) = (col: String, array: scala.collection.mutable.Set[String] ) =>  array.exists(d => col.contains(d))

val udf1 = udf(checker)

df.withColumn("word_check", udf1($"words", dict )).show()
Run Code Online (Sandbox Code Playgroud)

但得到另一种类型不匹配:

 found   : scala.collection.immutable.Set[String]
 required: org.apache.spark.sql.Column
Run Code Online (Sandbox Code Playgroud)

如果集合是固定数字,我应该能够Lit(Int)在表达式中使用?但我真的不明白通过在scala中混合使用不同的数据类型来在列上执行更复杂的函数.

任何帮助都非常感激,特别是如果它可以有效地完成(它是一个大于5米行的df).

Tza*_*har 7

以下是使用UDF执行此操作的方法:

val checkerUdf = udf { (s: String) => dict.exists(s.contains(_)) }

df.withColumn("word_check", checkerUdf($"words")).show()
Run Code Online (Sandbox Code Playgroud)

您实现中的错误是您创建了一个期望两个参数的UDF,这意味着您Column在应用它时必须传递两个- 但dict不是Column在您的DataFrame中,而是本地可变.


Psi*_*dom 6

无论效率如何,这似乎都有效:

df.withColumn("word_check", dict.foldLeft(lit(false))((a, b) => a || locate(b, $"words") > 0)).show

+---+------+----------+
| id| words|word_check|
+---+------+----------+
|  1|   foo|      true|
|  2|barrio|      true|
|  3|gitten|     false|
|  4|   baa|     false|
+---+------+----------+
Run Code Online (Sandbox Code Playgroud)


Rap*_*oth 5

如果你的字典很大,你不应该只在你的 udf 中引用它,因为每个任务的整个字典都会通过网络发送。我会结合 udf 广播你的 dict:

import org.apache.spark.broadcast.Broadcast

def udf_check(words: Broadcast[scala.collection.immutable.Set[String]]) = {
  udf {(s: String) => words.value.exists(s.contains(_))}
}

df.withColumn("word_check", udf_check(sparkContext.broadcast(dict))($"words"))
Run Code Online (Sandbox Code Playgroud)

或者,您也可以使用联接:

val dict_df = dict.toList.toDF("word")

df
  .join(broadcast(dict_df),$"words".contains($"word"),"left")
  .withColumn("word_check",$"word".isNotNull)
  .drop($"word")
Run Code Online (Sandbox Code Playgroud)