Spark数据集相当于scala的"collect",它具有部分功能

rmi*_*min 4 scala apache-spark apache-spark-dataset

常规scala集合有一个漂亮的collect方法,它允许我filter-map使用部分函数在一个传递中执行操作.火花上有相同的操作Dataset吗?

我想它有两个原因:

  • 句法简洁
  • 它将filter-map样式操作减少到一次通过(尽管在火花中我猜测有优化可以为你发现这些东西)

这是一个显示我的意思的例子.假设我有一系列选项,我想提取并加倍定义的整数(a中的那些Some):

val input = Seq(Some(3), None, Some(-1), None, Some(4), Some(5)) 
Run Code Online (Sandbox Code Playgroud)

方法1 - collect

input.collect {
  case Some(value) => value * 2
} 
// List(6, -2, 8, 10)
Run Code Online (Sandbox Code Playgroud)

collect使得这个语法非常简洁,并且一次通过.

方法2 - filter-map

input.filter(_.isDefined).map(_.get * 2)
Run Code Online (Sandbox Code Playgroud)

我可以将这种模式带到火花上,因为数据集和数据框有类似的方法.

但是,我不喜欢这个这么多,因为isDefinedget看起来像代码异味给我.有一个隐含的假设,即地图只接收Somes.编译器无法验证这一点.在一个更大的例子中,开发人员更难发现这种假设,开发人员可能会交换过滤器并映射,例如,不会出现语法错误.

方法3 - fold*操作

input.foldRight[List[Int]](Nil) {
  case (nextOpt, acc) => nextOpt match {
    case Some(next) => next*2 :: acc
    case None => acc
  }
}
Run Code Online (Sandbox Code Playgroud)

我没有使用过足够的火花来知道折叠是否有等价,所以这可能有点切线.

无论如何,模式匹配,折叠锅炉板和列表的重建都混杂在一起,很难读.


总的来说,我发现collect语法最好,我希望spark有这样的东西.

sho*_*ing 5

这里的答案是不正确的,至少在Spark的当前.

RDD实际上有一个collect方法,它采用部分函数并将过滤器和映射应用于数据.这与无参数.collect()方法完全不同.请参阅Spark源代码RDD.scala @ line 955:

/**
 * Return an RDD that contains all matching values by applying `f`.
 */
def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U] = withScope {
  val cleanF = sc.clean(f)
  filter(cleanF.isDefinedAt).map(cleanF)
}
Run Code Online (Sandbox Code Playgroud)

这不会实现RDD中的数据,而不是RDD.scala @ line 923中的无参数.collect()方法:

/**
 * Return an array that contains all of the elements in this RDD.
 */
def collect(): Array[T] = withScope {
  val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
  Array.concat(results: _*)
}
Run Code Online (Sandbox Code Playgroud)

在文档中,请注意如何

def collect[U](f: PartialFunction[T, U]): RDD[U]
Run Code Online (Sandbox Code Playgroud)

方法并没有有关数据加载到驱动程序的内存有与之相关的警告:

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD@collect[U](f:PartialFunction[T,U])(implicitevidence$29: scala.reflect.ClassTag [U]):org.apache.spark.rdd.RDD [U]

Spark让这些重载方法做完全不同的事情让人非常困惑.


编辑:我的错!我误解了这个问题,我们谈论的是DataSets而不是RDD.仍然,接受的答案说

"然而,Spark文档指出,"只有在期望结果数组很小的情况下才能使用此方法,因为所有数据都被加载到驱动程序的内存中."

哪个不对!调用.collect()的部分功能版本时,数据不会加载到驱动程序的内存中 - 仅在调用无参数版本时.调用.collect(partial_function)应该具有与依次调用.filter()和.map()相同的性能,如上面的源代码所示.


ste*_*ino 2

s 和scollect上定义的方法用于具体化驱动程序中的数据。RDDDataset

尽管没有类似于 Collections APIcollect方法的东西,但您的直觉是正确的:由于这两个操作都是延迟评估的,因此引擎有机会优化操作并将它们链接起来,以便以最大的局部性执行它们。

对于您特别提到的用例,我建议您考虑一下flatMap,它适用于RDDs 和Datasets:

// Assumes the usual spark-shell environment
// sc: SparkContext, spark: SparkSession
val collection = Seq(Some(1), None, Some(2), None, Some(3))
val rdd = sc.parallelize(collection)
val dataset = spark.createDataset(rdd)

// Both operations will yield `Array(2, 4, 6)`
rdd.flatMap(_.map(_ * 2)).collect
dataset.flatMap(_.map(_ * 2)).collect

// You can also express the operation in terms of a for-comprehension
(for (option <- rdd; n <- option) yield n * 2).collect
(for (option <- dataset; n <- option) yield n * 2).collect

// The same approach is valid for traditional collections as well
collection.flatMap(_.map(_ * 2))
for (option <- collection; n <- option) yield n * 2
Run Code Online (Sandbox Code Playgroud)

编辑

正如另一个问题中正确指出的那样,RDDs 实际上有通过应用部分函数collect来转换 an 的方法RDD,就像在普通集合中发生的那样。然而,正如Spark 文档指出的那样,“只有当结果数组预计很小时才应使用此方法,因为所有数据都加载到驱动程序的内存中。”

  • 我认为使用 PartialFunction 进行收集没有这个问题...警告位于另一种方法中(不带参数的收集)。 (5认同)