从 Spark 调用休息服务

Yas*_*ash 8 rest scala apache-spark restapi

我试图找出从 Spark 调用 Rest 端点的最佳方法。

我目前的方法(解决方案 [1])看起来像这样 -

val df = ... // some dataframe

val repartitionedDf = df.repartition(numberPartitions)

lazy val restEndPoint = new restEndPointCaller() // lazy evaluation of the object which creates the connection to REST. lazy vals are also initialized once per JVM (executor)

val enrichedDf = repartitionedDf 
.map(rec => restEndPoint.getResponse(rec)) // calls the rest endpoint for every record
.toDF
Run Code Online (Sandbox Code Playgroud)

我知道我可以使用 .mapPartitions() 而不是 .map(),但是查看 DAG,看起来 spark 优化了重新分区 -> 无论如何映射到 mapPartition。

在第二种方法(解决方案 [2])中,为每个分区创建一次连接,并为分区内的所有记录重用。

  val newDs = myDs.mapPartitions(partition => {
  val restEndPoint = new restEndPointCaller /*creates a db connection per partition*/

  val newPartition = partition.map(record => {
    restEndPoint.getResponse(record, connection)
  }).toList // consumes the iterator, thus calls readMatchingFromDB 

  restEndPoint.close() // close dbconnection here
  newPartition.iterator // create a new iterator
})
Run Code Online (Sandbox Code Playgroud)

在这第三种方法(解决方案 [3])中,每个 JVM(执行程序)创建一次连接,在执行程序处理的所有分区中重复使用。

    lazy val connection = new DbConnection /*creates a db connection per partition*/
    val newDs = myDs.mapPartitions(partition => {

          val newPartition = partition.map(record => {
            readMatchingFromDB(record, connection)
          }).toList // consumes the iterator, thus calls readMatchingFromDB 
        
          newPartition.iterator // create a new iterator

        })
    connection.close() // close dbconnection here
Run Code Online (Sandbox Code Playgroud)

[a] 对于非常相似的解决方案 [1] 和 [3],我对惰性 val 的工作方式的理解是否正确?目的是将每个 executor/JVM 的连接数限制为 1,并重用打开的连接来处理后续请求。我会为每个 JVM 创建 1 个连接还是每个分区创建 1 个连接?

[b] 还有其他方法可以控制我们向其余端点发出的请求 (RPS) 数量吗?

[c] 如果有更好、更有效的方法来做到这一点,请告诉我。

谢谢!

Bar*_*zny 5

IMO 的第二个解决方案mapPartitions更好。首先,您明确说明您期望实现的目标。转换的名称和实现的逻辑非常清楚地说明了这一点。对于第一个选项,您需要了解 Apache Spark 如何优化处理。这对您来说可能是显而易见的,但您还应该考虑将在您的代码上工作的人,或者只是在 6 个月、1 年、2 年等等之后处理您的代码。他们应该mapPartitionsrepartition+更好地理解map

此外,使用 map 重新分区的优化可能会在内部发生变化(我不相信它,但您仍然可以认为这是一个有效的点),此时您的工作表现会更差。

最后,使用第二个解决方案,您可以避免序列化可能遇到的许多问题。在您编写的代码中,驱动程序将创建端点对象的一个​​实例,将其序列化并发送到执行程序。所以是的,也许它会是一个单一的实例,但前提是它是可序列化的。


[编辑] 感谢您的澄清。您可以通过不同的方式实现您的目标。要让每个 JVM 恰好有 1 个连接,您可以使用称为单例的设计模式。在 Scala 中,它很容易表示为object(我在 Google https://alvinalexander.com/scala/how-to-implement-singleton-pattern-in-scala-with-object上找到的第一个链接)

而且它非常好,因为您不需要序列化任何东西。单例直接从执行器端的类路径中读取。有了它,你肯定只有一个给定对象的实例。

[a] 对于非常相似的解决方案 [1] 和 [3],我对惰性 val 的工作方式的理解是否正确?目的是将每个 executor/JVM 的连接数限制为 1,并重用打开的连接来处理后续请求。我会为每个 JVM 创建 1 个连接还是每个分区创建 1 个连接?它将为每个分区创建 1 个连接。您可以执行这个小测试来查看:

  class SerializationProblemsTest extends FlatSpec   {
    val conf = new SparkConf().setAppName("Spark serialization problems test").setMaster("local") 
    val sparkContext = SparkContext.getOrCreate(conf)
    "lazy object" should "be created once per partition" in {
      lazy val restEndpoint = new NotSerializableRest()
      sparkContext.parallelize(0 to 120).repartition(12)
        .mapPartitions(numbers => {
           //val restEndpoint = new NotSerializableRest()
           numbers.map(nr => restEndpoint.enrich(nr))
      })
      .collect()
   }
 }  
 class NotSerializableRest() {
   println("Creating REST instance")
   def enrich(id: Int): String = s"${id}"
}
Run Code Online (Sandbox Code Playgroud)

它应该打印创建 REST 实例12 次(分区数)

[b] 有什么方法可以控制我们向其余端点发出的请求 (RPS) 数量?

要控制请求数量,您可以使用类似于数据库连接池的方法:HTTP 连接池(一个快速找到的链接:使用 HttpClient 的 HTTP 连接池)。

但也许另一种有效的方法是处理较小的数据子集?因此,您无需处理 30000 行,而是可以将其拆分为不同的较小微批次(如果它是流式作业)。它应该让您的网络服务多一点“休息”。

否则,您也可以尝试发送批量请求(Elasticsearch 这样做是为了一次索引/删除多个文档https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html)。但这取决于 Web 服务是否允许您这样做。