Rem*_*coW 3 scala akka akka-stream
我有一个SourceQueue.当我向它提供一个元素时,我希望它通过它Stream,当它到达时,Sink输出返回到提供此元素的代码(类似于Sink.head返回一个元素到RunnableGraph.run()调用).
我该如何实现这一目标?我的问题的一个简单例子是:
val source = Source.queue[String](100, OverflowStrategy.fail)
val flow = Flow[String].map(element => s"Modified $element")
val sink = Sink.ReturnTheStringSomehow
val graph = source.via(flow).to(sink).run()
val x = graph.offer("foo")
println(x) // Output should be "Modified foo"
val y = graph.offer("bar")
println(y) // Output should be "Modified bar"
val z = graph.offer("baz")
println(z) // Output should be "Modified baz"
Run Code Online (Sandbox Code Playgroud)
编辑:我在这个问题中给出的例子弗拉基米尔马特维耶夫提供了最好的答案.但是,应该注意的是,只有当元素sink按照它们提供给它们的相同顺序进入时,此解决方案才有效source.如果无法保证这一点,则元素的顺序sink可能不同,结果可能与预期的不同.
我相信使用已经存在的原语来从流中提取值(称为)更简单Sink.queue.这是一个例子:
val source = Source.queue[String](128, OverflowStrategy.fail)
val flow = Flow[String].map(element => s"Modified $element")
val sink = Sink.queue[String]().withAttributes(Attributes.inputBuffer(1, 1))
val (sourceQueue, sinkQueue) = source.via(flow).toMat(sink)(Keep.both).run()
def getNext: String = Await.result(sinkQueue.pull(), 1.second).get
sourceQueue.offer("foo")
println(getNext)
sourceQueue.offer("bar")
println(getNext)
sourceQueue.offer("baz")
println(getNext)
Run Code Online (Sandbox Code Playgroud)
它完全符合你的要求.
请注意,设置inputBuffer队列接收器的属性对于您的用例可能重要也可能不重要 - 如果您未设置它,则缓冲区将为零大小,并且在您调用pull()方法之前数据不会流经流在水槽上.
sinkQueue.pull()产生a Future[Option[T]],Some如果接收器接收到元素,则成功完成,如果流失败,则成功完成.如果流正常完成,则将完成None.在这个特定的例子中,我通过使用忽略了这一点,Option.get但你可能想要添加自定义逻辑来处理这种情况.