为Spark迭代take()或批处理?

Sta*_*asM 7 apache-spark

我有一个数据集,我从Spark(例如Parquet文件)访问,包含大量的行.我需要将这些行中的一些数据发送到外部服务,我需要对它们进行批处理,以便每次对外部服务的调用都包含一定数量的行(例如,每批1000行).基本上take(n)是做什么,但是在大型数据集上反复地,迭代地做.执行此类任务的好方法是什么?我想它可以foreach()手动完成并批量聚合数据,但我想知道是否有任何内置/推荐的方法.

zer*_*323 5

我不知道任何内置或推荐的选项,但简单的解决方案是将RDD API和Scala IterableAPI 结合使用。如果您应用的操作是幂等的,则可以直接从工人那里进行:

val batchSize: Int = ???
val rdd: RDD[T] = ???
def doSomething(xs: Seq[T]) = ???

rdd.foreachPartition(_.grouped(batchSize).foreach(doSomething))
Run Code Online (Sandbox Code Playgroud)

否则,您可以同时获取驱动程序的单个分区:

rdd.cache
rdd.toLocalIterator.grouped(batchSize).foreach(doSomething)
Run Code Online (Sandbox Code Playgroud)

请注意,每个分区需要一个单独的作业,因此最好先缓存输入rdd以避免重新计算。

在Python中,您可以使用toolz库代替IteratorAPI:

from toolz.itertoolz import partition_all

rdd.foreachPartition(
  lambda iter: [doSomething(xs) for xs in partition_all(batchSize, iter)])
Run Code Online (Sandbox Code Playgroud)

要么

for xs in partition_all(batchSize, rdd.toLocalIterator()):
    doSomething(xs)
Run Code Online (Sandbox Code Playgroud)


kos*_*tya 4

当您从 parquet 文件创建 DataFrame 时,它​​会根据 HDFS 块位置进行分区。

因此,要问的第一个问题是否可以并行地将数据集写入外部服务。即同时从多个服务器批量发送 1000 行。

如果可以的话,那么最有效的方法就是函数foreachPartition。就像是:

df.rdd.forEachPartition { it =>
  it.grouped(1000).foreach(sendBatch)
}
Run Code Online (Sandbox Code Playgroud)

如果您的外部服务无法以这种方式使用,那么第二个最佳选择是toLocalIterator

df.rdd.toLocalIterator { it =>
  it.grouped(1000).foreach(sendBatch)
}
Run Code Online (Sandbox Code Playgroud)

请注意,此解决方案的效率明显较低,因为它将序列化每个分区并将其从执行器传输到驱动程序。

  • 如何在java中实现同样的事情? (2认同)