exp*_*ert 5 scala akka akka-stream
什么可以替代动态更改运行图?这是我的情况.我有图表将文章摄入DB.文章来自3种不同格式的插件.因此,我有几个流程
val converterFlow1: Flow[ImpArticle, Article, NotUsed]
val converterFlow2: Flow[NewsArticle, Article, NotUsed]
val sinkDB: Sink[Article, Future[Done]]
// These are being created every time I poll plugins
val sourceContentProvider : Source[ImpArticle, NotUsed]
val sourceNews : Source[NewsArticle, NotUsed]
val sourceCit : Source[Article, NotUsed]
val merged = Source.combine(
sourceContentProvider.via(converterFlow1),
sourceNews.via(converterFlow2),
sourceCit)(Merge(_))
val res = merged
.buffer(10, OverflowStrategy.backpressure)
.toMat(sinkDB)(Keep.both)
.run()
Run Code Online (Sandbox Code Playgroud)
问题是我每24小时从内容提供商处获取一次数据,每2小时一次从新闻获取数据,最后一个来源可能随时来自,因为它来自人类.
我意识到图形是不可变的,但我如何定期将新实例附加Source到我的图形中,以便我对摄取过程进行单点限制?
更新:您可以说我的数据是Source-s 流,在我的情况下是三个来源.但我不能改变它,因为我Source从外部类(所谓的插件)获取实例.这些插件独立于我的摄取类.我不能将它们组合成一个巨大的类来拥有单一的Source.
好吧,一般来说,正确的方法是将源流加入到单个源中,即从Source[Source[T, _], Whatever]到Source[T, Whatever]。flatMapConcat这可以通过或 来完成flatMapMerge。因此,如果您可以获得 a Source[Source[Article, NotUsed], NotUsed],则可以使用其中一种flatMap*变体并获得最终的Source[Article, NotUsed]。对你的每个来源都这样做(没有双关语),然后你原来的方法应该起作用。