Apache Spark - foreach与foreachPartitions何时使用什么?

Ben*_*zzo 35 java foreach scala apache-spark

我想知道foreachPartitions,与foreach考虑我RDD为了对累加器变量执行一些求和的情况下的方法相比,由于更高的并行度,意志是否会产生更好的性能.

Ram*_*ram 25

foreach并且foreachPartitions是行动.

foreach(功能):单位

用于调用具有副作用的操作的通用函数.对于RDD中的每个元素,它调用传递的函数.这通常用于操纵累加器或写入外部存储器.

注意:修改除累加器之外的变量foreach()可能会导致未定义的行为.有关详细信息,请参阅了解闭包.

例如:

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Long = 10
Run Code Online (Sandbox Code Playgroud)

foreachPartition(function):单位

类似于foreach(),但不是为每个元素调用函数,而是为每个分区调用它.该函数应该能够接受迭代器.这比foreach()因为它减少了函数调用的数量(就像mapPartitions())更有效.

示例用法foreachPartition:对于每个分区,您要使用一个数据库连接(每个分区块的内部),然后这是使用scala完成它的方法的示例用法.

/**
    * Insert in to database using foreach partition.
    *
    * @param sqlDatabaseConnectionString
    * @param sqlTableName
    */
  def insertToTable(sqlDatabaseConnectionString: String, sqlTableName: String): Unit = {

    //numPartitions = number of simultaneous DB connections you can planning to give

datframe.repartition(numofpartitionsyouwant)

    val tableHeader: String = dataFrame.columns.mkString(",")
    dataFrame.foreachPartition { partition =>
      // Note : Each partition one connection (more better way is to use connection pools)
      val sqlExecutorConnection: Connection = DriverManager.getConnection(sqlDatabaseConnectionString)
      //Batch size of 1000 is used since some databases cant use batch size more than 1000 for ex : Azure sql
      partition.grouped(1000).foreach {
        group =>
          val insertString: scala.collection.mutable.StringBuilder = new scala.collection.mutable.StringBuilder()
          group.foreach {
            record => insertString.append("('" + record.mkString(",") + "'),")
          }

          sqlExecutorConnection.createStatement()
            .executeUpdate(f"INSERT INTO [$sqlTableName] ($tableHeader) VALUES "
              + insertString.stripSuffix(","))
      }


      sqlExecutorConnection.close() // close the connection so that connections wont exhaust.
    }
  }

累加器样本片段可以用它来玩...你可以通过它来测试性能

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
// only once per partition You can safely share a thread-safe Kafka //producer instance.
    val producer = createKafkaProducer()
    partitionOfRecords.foreach { message =>
      producer.send(message)
    }
    producer.close()
  }
}
Run Code Online (Sandbox Code Playgroud)

结论:

foreachPartition 对分区的操作显然比它更好 sparkContext.broadcast

经验法则:

foreachPartition当你访问昂贵的资源,如数据库连接等时,应该使用它.这将初始化每个分区一个而不是每个元素一个(foreach).当涉及蓄能器时,您可以通过上述测试方法测量性能,这对于蓄能器也应该更快.

另外......请参阅map vs mappartitions,它们具有相似的概念,但它们是转换.

  • 一个很棒的解释可以请您添加一些方案,其中foreach分区会比foreach慢(如果让我们说累加器),因为在这种情况下foreachpartition会在内部调用foreach。 (2认同)

Bin*_*ang 22

foreach 自动在许多节点上运行循环.

但是,有时您希望在每个节点上执行某些操作.例如,建立与数据库的连接.您不能只是建立连接并将其传递给foreach函数:连接仅在一个节点上进行.

因此foreachPartition,在运行循环之前,您可以在每个节点上建立数据库连接.

  • 这仍然不是每个节点,它是每个分区.可以有比节点多的分区.如果每个节点需要一个连接(更可能是每个JVM或YARN术语中的容器),那么您需要一些其他解决方案. (9认同)

Jus*_*ony 16

foreach和之间真的没什么区别foreachPartitions.在幕后,所有foreach正在做的就是foreach使用提供的函数调用迭代器.foreachPartition只是让你有机会在迭代器的循环之外做一些事情,通常是一些昂贵的事情,如启动数据库连接或沿着这些线路的东西.因此,如果您没有为每个节点的迭代器完成一次并在整个过程中重复使用的任何内容,那么我建议使用foreach以提高清晰度并降低复杂性.