Spark DataFrame过滤:保留属于列表的元素

Ram*_*ami 4 scala dataframe apache-spark apache-spark-sql apache-zeppelin

我在Zeppelin笔记本上使用Spark 1.5.1和Scala.

  • 我有一个DataFrame,其中一个名为userID的列为Long类型.
  • 总共我有大约400万行和200,000个唯一用户ID.
  • 我还有一个要排除的50,000个userID的列表.
  • 我可以轻松构建要保留的userID列表.

删除属于要排除的用户的所有行的最佳方法是什么?

提出同一问题的另一种方法是:保持属于用户的行的最佳方法是什么?

我看到这篇文章并应用了它的解决方案(参见下面的代码),但执行速度很慢,知道我在我的本地机器上运行SPARK 1.5.1,我有一个16GB的内存和初始DataFrame适合于记忆.

这是我申请的代码:

import org.apache.spark.sql.functions.lit
val finalDataFrame = initialDataFrame.where($"userID".in(listOfUsersToKeep.map(lit(_)):_*))
Run Code Online (Sandbox Code Playgroud)

在上面的代码中:

  • initialDataFrame有3885068行,每行有5列,其中一列称为userID,它包含Long值.
  • listOfUsersToKeep是一个Array [Long],它包含150,000个Long userID.

我想知道是否有比我使用的更有效的解决方案.

谢谢

zer*_*323 8

你可以使用join:

val usersToKeep = sc.parallelize(
  listOfUsersToKeep.map(Tuple1(_))).toDF("userID_")

val finalDataFrame = usersToKeep
  .join(initialDataFrame, $"userID" === $"userID_")
  .drop("userID_")
Run Code Online (Sandbox Code Playgroud)

或广播变量和UDF:

import org.apache.spark.sql.functions.udf

val usersToKeepBD = sc.broadcast(listOfUsersToKeep.toSet)
val checkUser = udf((id: Long) => usersToKeepBD.value.contains(id))
val finalDataFrame = initialDataFrame.where(checkUser($"userID"))
Run Code Online (Sandbox Code Playgroud)

也可以广播DataFrame:

import org.apache.spark.sql.functions.broadcast

initialDataFrame.join(broadcast(usersToKeep), $"userID" === $"userID_")
Run Code Online (Sandbox Code Playgroud)