Loi*_*oic 18 scala akka playframework akka-stream
我想使用SourceQueue将元素动态推送到Akka Stream源.Play控制器需要Source才能使用该chuncked方法传输结果.
由于Play使用自己的Akka Stream Sink,我无法使用Sink实现源队列,因为在方法使用之前源将被消耗chunked(除非我使用下面的hack).
如果我使用反应流发布器预先实现源队列,我能够使它工作,但它是一种"肮脏的黑客":
def sourceQueueAction = Action{
val (queue, pub) = Source.queue[String](10, OverflowStrategy.fail).toMat(Sink.asPublisher(false))(Keep.both).run()
//stupid example to push elements dynamically
val tick = Source.tick(0 second, 1 second, "tick")
tick.runForeach(t => queue.offer(t))
Ok.chunked(Source.fromPublisher(pub))
}
Run Code Online (Sandbox Code Playgroud)
是否有更简单的方法在PlayFramework中使用Akka Streams SourceQueue?
谢谢
Loi*_*oic 26
解决方案是mapMaterializedValue在源上使用以获得其队列实现的未来:
def sourceQueueAction = Action {
val (queueSource, futureQueue) = peekMatValue(Source.queue[String](10, OverflowStrategy.fail))
futureQueue.map { queue =>
Source.tick(0.second, 1.second, "tick")
.runForeach (t => queue.offer(t))
}
Ok.chunked(queueSource)
}
//T is the source type, here String
//M is the materialization type, here a SourceQueue[String]
def peekMatValue[T, M](src: Source[T, M]): (Source[T, M], Future[M]) = {
val p = Promise[M]
val s = src.mapMaterializedValue { m =>
p.trySuccess(m)
m
}
(s, p.future)
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3307 次 |
| 最近记录: |