Akka流。一次控制Akka流中正在处理的项目数

And*_*ans 5 scala akka hazelcast akka-stream

Akka流大大减少了我的样板代码,并包含许多有用的功能。但是,我需要能够限制项目的处理速度。问题在于,我正在向连接到资源的源链接的Hazelcast队列馈送,以便随着时间的推移(从单个在线站点)下载资源,但是进入队列的链接数量可能会非常大。理想情况下,一次最多运行50-60个请求。Akka Streams中是否有一项功能可以让我限制一次处理的项目数?

进一步的限制是在与某些网站进行交互时需要复杂的状态管理,代码处理和其他功能。Akka Http无法在此提供帮助。我的网络代码完全用Jsoup和Apache Http Components编写,偶尔会调用基于JavaFX的服务器来呈现脚本。

我当前尝试使用文档中描述的使用缓冲区控制输入速率的方法如下:

val sourceGraph: Graph[SourceShape[(FlowConfig, Term)], NotUsed] = new HazelcastTermSource(conf.termQueue, conf)
val source = Source.fromGraph(sourceGraph)     
val (killSwitch, last) = source
                .buffer(conf.crawlStreamConf.maxCrawlConcurrency, OverflowStrategy.backpressure)
                .viaMat(new DownloadFlow())(Keep.both)
                .map(x => println(x))
                .to(Sink.ignore).run()
Run Code Online (Sandbox Code Playgroud)

Ste*_*tti 4

您正在寻找的机制是mapAsync(或者mapAsyncUnordered,如果不需要保留排序 - 就像您的示例中那样)。这些组合器采用一个parallelism参数,其目的是限制该阶段可以运行的并行任务的数量。

它应该成为您的一部分DownloadFlow。假设您DownloadFlow运行异步代码,您可以这样构造它:

def download(input: Input): Future[Output] = ???

val downloadFlow: Flow[Input, Output, NotUsed] = Flow[Input].mapAsyncUnordered(50)(download)

val (killSwitch, last) = source
                .buffer(conf.crawlStreamConf.maxCrawlConcurrency, OverflowStrategy.backpressure)
                .viaMat(downloadFlow)(Keep.both)
                .map(x => println(x))
                .to(Sink.ignore).run()
Run Code Online (Sandbox Code Playgroud)

由于您的下载流程具有有意义的物化价值,因此它可能会稍微复杂一些,但希望您明白这一点。

请参阅文档以获取更多信息。