如何动态地将Source添加到现有Graph?

exp*_*ert 5 scala akka akka-stream

什么可以替代动态更改运行图?这是我的情况.我有图表将文章摄入DB.文章来自3种不同格式的插件.因此,我有几个流程

val converterFlow1: Flow[ImpArticle, Article, NotUsed]
val converterFlow2: Flow[NewsArticle, Article, NotUsed]
val sinkDB: Sink[Article, Future[Done]]

// These are being created every time I poll plugins    
val sourceContentProvider : Source[ImpArticle, NotUsed]
val sourceNews : Source[NewsArticle, NotUsed]
val sourceCit : Source[Article, NotUsed]

val merged = Source.combine(
    sourceContentProvider.via(converterFlow1),
    sourceNews.via(converterFlow2),
    sourceCit)(Merge(_))

val res = merged
  .buffer(10, OverflowStrategy.backpressure)
  .toMat(sinkDB)(Keep.both)
  .run()
Run Code Online (Sandbox Code Playgroud)

问题是我每24小时从内容提供商处获取一次数据,每2小时一次从新闻获取数据,最后一个来源可能随时来自,因为它来自人类.

我意识到图形是不可变的,但我如何定期将新实例附加Source到我的图形中,以便我对摄取过程进行单点限制?

更新:您可以说我的数据是Source-s 流,在我的情况下是三个来源.但我不能改变它,因为我Source从外部类(所谓的插件)获取实例.这些插件独立于我的摄取类.我不能将它们组合成一个巨大的类来拥有单一的Source.

Vla*_*eev 3

好吧,一般来说,正确的方法是将源流加入到单个源中,即从Source[Source[T, _], Whatever]Source[T, Whatever]flatMapConcat这可以通过或 来完成flatMapMerge。因此,如果您可以获得 a Source[Source[Article, NotUsed], NotUsed],则可以使用其中一种flatMap*变体并获得最终的Source[Article, NotUsed]。对你的每个来源都这样做(没有双关语),然后你原来的方法应该起作用。