有条件地将`filter` /`where`应用于Spark`Dataset` /`Dataframe`

Vas*_*kas 2 scala apache-spark apache-spark-sql spark-dataframe apache-spark-dataset

嗨,大家好,我有一个函数可以从S3的某些位置加载数据集并返回有趣的数据

private def filterBrowseIndex(spark: SparkSession, s3BrowseIndex: String, mids: Seq[String] = Seq(), indices: Seq[String] = Seq()): Dataset[BrowseIndex] = {
import spark.implicits._

spark
  .sparkContext.textFile(s3BrowseIndex)
  // split text dataset
  .map(line => line.split("\\s+"))
  // get types for attributes
  .map(BrowseIndex.strAttributesToBrowseIndex)
  // cast it to a dataset (requires implicit conversions)
  .toDS()
  // pick rows for the given marketplaces
  .where($"mid".isin(mids: _*))
  // pick rows for the given indices
  .where($"index".isin(indices: _*))
Run Code Online (Sandbox Code Playgroud)

}

如果有人提供mids = Seq()或,此实现将过滤掉所有内容indices = Seq()。另一方面,我希望语义是“仅在mids不为空的情况下应用此where子句”(与相同indices),这样,如果函数的用户提供空序列,则不会进行过滤。

有没有很好的功能方法可以做到这一点?

Sim*_*Sim 5

如果您不介意逻辑有些复杂,拉斐尔·罗斯(Raphael Roth)的答案是解决应用过滤器这一特定问题的不错选择。通用解决方案适用于任何条件转换(不只是过滤,而且不对决策分支之一执行任何操作)transform,例如,使用

spark
  .sparkContext.textFile(s3BrowseIndex)
  // split text dataset
  .map(line => line.split("\\s+"))
  // get types for attributes
  .map(BrowseIndex.strAttributesToBrowseIndex)
  // cast it to a dataset (requires implicit conversions)
  .toDS()
  .transform { ds =>
    // pick rows for the given marketplaces
    if (mids.isEmpty) ds
    else ds.where($"mid".isin(mids: _*))
  }
  .transform { ds =>
    // pick rows for the given indices
    if (indices.isEmpty) ds
    else ds.where($"index".isin(indices: _*))
  }
Run Code Online (Sandbox Code Playgroud)

如果您使用的是稳定的类型的数据集(或dataframes,这是Dataset[Row]),transform是非常有用的,你可以建立的转换函数序列,然后再应用:

transformations.foldLeft(ds)(_ transform _)
Run Code Online (Sandbox Code Playgroud)

在许多情况下,这种方法有助于代码重用和可测试性。