Vas*_*kas 2 scala apache-spark apache-spark-sql spark-dataframe apache-spark-dataset
嗨,大家好,我有一个函数可以从S3的某些位置加载数据集并返回有趣的数据
private def filterBrowseIndex(spark: SparkSession, s3BrowseIndex: String, mids: Seq[String] = Seq(), indices: Seq[String] = Seq()): Dataset[BrowseIndex] = {
import spark.implicits._
spark
.sparkContext.textFile(s3BrowseIndex)
// split text dataset
.map(line => line.split("\\s+"))
// get types for attributes
.map(BrowseIndex.strAttributesToBrowseIndex)
// cast it to a dataset (requires implicit conversions)
.toDS()
// pick rows for the given marketplaces
.where($"mid".isin(mids: _*))
// pick rows for the given indices
.where($"index".isin(indices: _*))
Run Code Online (Sandbox Code Playgroud)
}
如果有人提供mids = Seq()或,此实现将过滤掉所有内容indices = Seq()。另一方面,我希望语义是“仅在mids不为空的情况下应用此where子句”(与相同indices),这样,如果函数的用户提供空序列,则不会进行过滤。
有没有很好的功能方法可以做到这一点?
如果您不介意逻辑有些复杂,拉斐尔·罗斯(Raphael Roth)的答案是解决应用过滤器这一特定问题的不错选择。通用解决方案适用于任何条件转换(不只是过滤,而且不对决策分支之一执行任何操作)transform,例如,使用
spark
.sparkContext.textFile(s3BrowseIndex)
// split text dataset
.map(line => line.split("\\s+"))
// get types for attributes
.map(BrowseIndex.strAttributesToBrowseIndex)
// cast it to a dataset (requires implicit conversions)
.toDS()
.transform { ds =>
// pick rows for the given marketplaces
if (mids.isEmpty) ds
else ds.where($"mid".isin(mids: _*))
}
.transform { ds =>
// pick rows for the given indices
if (indices.isEmpty) ds
else ds.where($"index".isin(indices: _*))
}
Run Code Online (Sandbox Code Playgroud)
如果您使用的是稳定的类型的数据集(或dataframes,这是Dataset[Row]),transform是非常有用的,你可以建立的转换函数序列,然后再应用:
transformations.foldLeft(ds)(_ transform _)
Run Code Online (Sandbox Code Playgroud)
在许多情况下,这种方法有助于代码重用和可测试性。
| 归档时间: |
|
| 查看次数: |
1836 次 |
| 最近记录: |