我有一个数据集,我从Spark(例如Parquet文件)访问,包含大量的行.我需要将这些行中的一些数据发送到外部服务,我需要对它们进行批处理,以便每次对外部服务的调用都包含一定数量的行(例如,每批1000行).基本上take(n)是做什么,但是在大型数据集上反复地,迭代地做.执行此类任务的好方法是什么?我想它可以foreach()手动完成并批量聚合数据,但我想知道是否有任何内置/推荐的方法.
我不知道任何内置或推荐的选项,但简单的解决方案是将RDD API和Scala IterableAPI 结合使用。如果您应用的操作是幂等的,则可以直接从工人那里进行:
val batchSize: Int = ???
val rdd: RDD[T] = ???
def doSomething(xs: Seq[T]) = ???
rdd.foreachPartition(_.grouped(batchSize).foreach(doSomething))
Run Code Online (Sandbox Code Playgroud)
否则,您可以同时获取驱动程序的单个分区:
rdd.cache
rdd.toLocalIterator.grouped(batchSize).foreach(doSomething)
Run Code Online (Sandbox Code Playgroud)
请注意,每个分区需要一个单独的作业,因此最好先缓存输入rdd以避免重新计算。
在Python中,您可以使用toolz库代替IteratorAPI:
from toolz.itertoolz import partition_all
rdd.foreachPartition(
lambda iter: [doSomething(xs) for xs in partition_all(batchSize, iter)])
Run Code Online (Sandbox Code Playgroud)
要么
for xs in partition_all(batchSize, rdd.toLocalIterator()):
doSomething(xs)
Run Code Online (Sandbox Code Playgroud)
当您从 parquet 文件创建 DataFrame 时,它会根据 HDFS 块位置进行分区。
因此,要问的第一个问题是否可以并行地将数据集写入外部服务。即同时从多个服务器批量发送 1000 行。
如果可以的话,那么最有效的方法就是函数foreachPartition。就像是:
df.rdd.forEachPartition { it =>
it.grouped(1000).foreach(sendBatch)
}
Run Code Online (Sandbox Code Playgroud)
如果您的外部服务无法以这种方式使用,那么第二个最佳选择是toLocalIterator:
df.rdd.toLocalIterator { it =>
it.grouped(1000).foreach(sendBatch)
}
Run Code Online (Sandbox Code Playgroud)
请注意,此解决方案的效率明显较低,因为它将序列化每个分区并将其从执行器传输到驱动程序。
| 归档时间: |
|
| 查看次数: |
1786 次 |
| 最近记录: |