Ben*_*zzo 35 java foreach scala apache-spark
我想知道foreachPartitions
,与foreach
考虑我RDD
为了对累加器变量执行一些求和的情况下的方法相比,由于更高的并行度,意志是否会产生更好的性能.
Ram*_*ram 25
foreach
并且foreachPartitions
是行动.
用于调用具有副作用的操作的通用函数.对于RDD中的每个元素,它调用传递的函数.这通常用于操纵累加器或写入外部存储器.
注意:修改除累加器之外的变量foreach()
可能会导致未定义的行为.有关详细信息,请参阅了解闭包.
例如:
scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Long = 10
Run Code Online (Sandbox Code Playgroud)
类似于
foreach()
,但不是为每个元素调用函数,而是为每个分区调用它.该函数应该能够接受迭代器.这比foreach()
因为它减少了函数调用的数量(就像mapPartitions
())更有效.
示例用法foreachPartition
:对于每个分区,您要使用一个数据库连接(每个分区块的内部),然后这是使用scala完成它的方法的示例用法.
/** * Insert in to database using foreach partition. * * @param sqlDatabaseConnectionString * @param sqlTableName */ def insertToTable(sqlDatabaseConnectionString: String, sqlTableName: String): Unit = { //numPartitions = number of simultaneous DB connections you can planning to give datframe.repartition(numofpartitionsyouwant) val tableHeader: String = dataFrame.columns.mkString(",") dataFrame.foreachPartition { partition => // Note : Each partition one connection (more better way is to use connection pools) val sqlExecutorConnection: Connection = DriverManager.getConnection(sqlDatabaseConnectionString) //Batch size of 1000 is used since some databases cant use batch size more than 1000 for ex : Azure sql partition.grouped(1000).foreach { group => val insertString: scala.collection.mutable.StringBuilder = new scala.collection.mutable.StringBuilder() group.foreach { record => insertString.append("('" + record.mkString(",") + "'),") } sqlExecutorConnection.createStatement() .executeUpdate(f"INSERT INTO [$sqlTableName] ($tableHeader) VALUES " + insertString.stripSuffix(",")) } sqlExecutorConnection.close() // close the connection so that connections wont exhaust. } }
累加器样本片段可以用它来玩...你可以通过它来测试性能
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// only once per partition You can safely share a thread-safe Kafka //producer instance.
val producer = createKafkaProducer()
partitionOfRecords.foreach { message =>
producer.send(message)
}
producer.close()
}
}
Run Code Online (Sandbox Code Playgroud)
foreachPartition
对分区的操作显然比它更好sparkContext.broadcast
foreachPartition
当你访问昂贵的资源,如数据库连接等时,应该使用它.这将初始化每个分区一个而不是每个元素一个(foreach
).当涉及蓄能器时,您可以通过上述测试方法测量性能,这对于蓄能器也应该更快.
另外......请参阅map vs mappartitions,它们具有相似的概念,但它们是转换.
Bin*_*ang 22
foreach
自动在许多节点上运行循环.
但是,有时您希望在每个节点上执行某些操作.例如,建立与数据库的连接.您不能只是建立连接并将其传递给foreach
函数:连接仅在一个节点上进行.
因此foreachPartition
,在运行循环之前,您可以在每个节点上建立数据库连接.
Jus*_*ony 16
foreach
和之间真的没什么区别foreachPartitions
.在幕后,所有foreach
正在做的就是foreach
使用提供的函数调用迭代器.foreachPartition
只是让你有机会在迭代器的循环之外做一些事情,通常是一些昂贵的事情,如启动数据库连接或沿着这些线路的东西.因此,如果您没有为每个节点的迭代器完成一次并在整个过程中重复使用的任何内容,那么我建议使用foreach
以提高清晰度并降低复杂性.
归档时间: |
|
查看次数: |
41689 次 |
最近记录: |