exp*_*ert 5 scala akka akka-stream
我正在尝试使用新的Akka流,并想知道我如何使用并将源队列返回给调用者而不在我的代码中实现它?
想象一下,我们有一些库可以进行多次异步调用并返回结果Source.功能看起来像这样
def findArticlesByTitle(text: String): Source[String, SourceQueue[String]] = {
val source = Source.queue[String](100, backpressure)
source.mapMaterializedValue { case queue =>
val url = s"http://.....&term=$text"
httpclient.get(url).map(httpResponseToSprayJson[SearchResponse]).map { v =>
v.idlist.foreach { id =>
queue.offer(id)
}
queue.complete()
}
}
source
}
Run Code Online (Sandbox Code Playgroud)
和调用者可能会像这样使用它
// There is implicit ActorMaterializer somewhere
val stream = plugin.findArticlesByTitle(title)
val results = stream.runFold(List[String]())((result, article) => article :: result)
Run Code Online (Sandbox Code Playgroud)
当我运行此代码时,mapMaterializedValue永远不会执行.
我无法理解为什么我无法访问实例,SourceQueue如果应该由调用者来决定如何实现源代码.
我该如何实现呢?
在您的代码示例中,您将返回source而不是返回值source.mapMaterializedValue(方法调用不会改变Source对象).