Scala 线程池 - 并发调用 API

Abh*_*bhi 5 functional-programming scala blockingqueue apache-spark databricks


我在 databricks 中有一个用例,其中必须对 URL 数据集进行 API 调用。该数据集大约有 100K 条记录。允许的最大并发数是 3。
我在 Scala 中实现并在 databricks 笔记本中运行。除了队列中待处理的一个元素之外,我觉得这里还缺少一些东西。
阻塞队列和线程池是解决这个问题的正确方法吗?

在下面的代码中,我进行了修改,而不是从数据集中读取,而是在 Seq 上进行采样。任何帮助/想法将不胜感激。

 
import java.time.LocalDateTime
import java.util.concurrent.{ArrayBlockingQueue,BlockingQueue}
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit; 

var inpQueue:BlockingQueue[(Int, String)] = new ArrayBlockingQueue[(Int, String)](1)

val inpDS = Seq((1,"https://google.com/2X6barD"), (2,"https://google.com/3d9vCgW"), (3,"https://google.com/2M02Xz0"), (4,"https://google.com/2XOu2uL"), (5,"https://google.com/2AfBWF0"), (6,"https://google.com/36AEKsw"), (7,"https://google.com/3enBxz7"), (8,"https://google.com/36ABq0x"), (9,"https://google.com/2XBjmiF"), (10,"https://google.com/36Emlen"))


val pool = Executors.newFixedThreadPool(3) 
var i = 0
inpDS.foreach{
  ix => {

    inpQueue.put(ix)
    val t = new ConsumerAPIThread()
    t.setName("MyThread-"+i+" ")
    pool.execute(t)

  }
   i = i+1
}

println("Final Queue Size = " +inpQueue.size+"\n")


class ConsumerAPIThread() extends Thread  
{ 
  var name =""

    override def run() 
    { 
        val urlDetail =  inpQueue.take()
        print(this.getName()+" "+ Thread.currentThread().getName() + " popped "+urlDetail+" Queue Size "+inpQueue.size+" \n") 
      triggerAPI((urlDetail._1, urlDetail._2))
    } 

    def triggerAPI(params:(Int,String)){

    try{
      val result = scala.io.Source.fromURL(params._2)
      println("" +result)
    }catch{
     case ex:Exception  => {

       println("Exception caught")
       }

    }

  }
   def ConsumerAPIThread(s:String) 
    { 
        name = s; 
    } 
}
Run Code Online (Sandbox Code Playgroud)

ste*_*ino 5

因此,您有两个要求:功能性要求是您想要异步处理列表中的项目,非功能性要求是您不希望一次处理超过三个项目。

关于后者,好的一点是,正如您在问题中已经表明的那样,Java 本机公开了一个封装良好的任务,Executor该任务在固定大小的线程池上运行,如果您使用线程,则可以优雅地允许您限制并发级别。

转向功能需求,Scala 通过在其标准 API 中提供一些能够精确满足功能需求的东西来提供帮助。特别是它使用scala.concurrent.Future,因此为了使用它,我们必须triggerAPI根据重新构建Future。该函数的内容并不是特别相关,因此我们现在主要关注其(修订后的)签名:

import scala.concurrent.Future
import scala.concurrent.ExecutionContext

def triggerAPI(params: (Int, String))(implicit ec: ExecutionContext): Future[Unit] =
  Future {
    // some code that takes some time to run...
  }
Run Code Online (Sandbox Code Playgroud)

请注意,现在triggerAPI返回一个Future. AFuture可以被认为是最终要计算的内容的读取句柄。特别是,这是一个Future[Unit],其中Unit代表“我们并不特别关心这个函数的输出,但主要关心它的副作用”

此外,请注意该方法现在采用隐式参数,即ExecutionContext。用于ExecutionContextFutures 提供某种形式的计算发生环境。ExecutionContextScala 有一个 API 可以从 a创建 an java.util.concurrent.ExecutorService,因此这对于在固定线程池上运行我们的计算非常有用,在任何给定时间运行不超过三个回调。

在继续之前,如果您对Futures、ExecutionContexts 和隐式参数有疑问,Scala 文档是您最好知识来源(这里有一些提示:1、2)。

现在我们有了新triggerAPI方法,我们可以使用Future.traverse这里是 Scala 2.12 的文档——撰写本文时最新版本是 2.13,但据我所知,Spark 用户暂时停留在 2.12)。

简而言之它采用某种形式的容器和一个函数,该Future.traverse函数获取该容器中的项目并返回Future其他内容。该函数将应用于容器中的每个项目,结果将是Future结果容器的一个。在你的情况下:容器是 a List,物品是(Int, String),你返回的其他东西是 a Unit

这意味着您可以简单地这样调用它:

Future.traverse(inpDS)(triggerAPI)
Run Code Online (Sandbox Code Playgroud)

并将triggerAPI应用于 中的每个项目inpDS

通过确保调用时线程池支持的执行上下文位于隐式范围内Future.traverse,项目将使用所需的线程池进行处理。

调用的结果是Future[List[Unit]],这不是很有趣,可以简单地丢弃(因为您只对副作用感兴趣)。

说了很多,如果您想使用我描述的代码,您可以在 Scastie 上进行操作。

作为参考,这是整个实现:

import java.util.concurrent.{ExecutorService, Executors}

import scala.concurrent.duration.DurationLong
import scala.concurrent.Future
import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService}
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}

val datasets = List(
  (1, "https://google.com/2X6barD"),
  (2, "https://google.com/3d9vCgW"),
  (3, "https://google.com/2M02Xz0"),
  (4, "https://google.com/2XOu2uL"),
  (5, "https://google.com/2AfBWF0"),
  (6, "https://google.com/36AEKsw"),
  (7, "https://google.com/3enBxz7"),
  (8, "https://google.com/36ABq0x"),
  (9, "https://google.com/2XBjmiF")
)

val executor: ExecutorService = Executors.newFixedThreadPool(3)
implicit val executionContext: ExecutionContextExecutorService = ExecutionContext.fromExecutorService(executor)

def triggerAPI(params: (Int, String))(implicit ec: ExecutionContext): Future[Unit] =
  Future {
    val (index, _) = params
    println(s"+ started processing $index")
    val start = System.nanoTime() / 1000000
    Iterator.from(0).map(_ + 1).drop(100000000).take(1).toList.head // a noticeably slow operation
    val end = System.nanoTime() / 1000000
    val duration = (end - start).millis
    println(s"- finished processing $index after $duration")
  }

Future.traverse(datasets)(triggerAPI).onComplete {
  case result =>
    println("* processing is over, shutting down the executor")
    executionContext.shutdown()
}
Run Code Online (Sandbox Code Playgroud)