通过从Graph创建Source,Akka-stream UnsupportedOperationException

Cem*_*ser 5 scala stream-processing akka akka-stream

我正在尝试使用*subFlows连接流.因此,我从广播的出口建立了一个来源.但它抛出了一个UnsupportedOperationException: cannot replace the shape of the EmptyModule.我试图谷歌这个例外,但我找不到类似的东西.

在这里我的代码

val aggFlow = Flow.fromGraph(GraphDSL.create() { implicit builder =>
    val broadcast = builder.add(Broadcast[MonitoringMetricEvent](2))
    val bc = builder.add(Broadcast[Long](1))

    val zip = builder.add(ZipWith[StreamMeasurement, Long, (StreamMeasurement, Long)]((value, ewma) => (value, ewma)))
    val merge = builder.add(Merge[Seq[StreamMeasurement]](1))


    broadcast.out(1) ~> identityFlow ~> maxFlow ~> bc

    val source = Source.fromGraph(GraphDSL.create() { implicit bl =>
      SourceShape(bc.out(0))
    })

    broadcast.out(0) ~> identityFlow ~> topicFlow.groupBy(MAX_SUB_STREAMS, _._1)
        .map(_._2)
        .zip[Long](source)
        .takeWhile(deciderFunction)
        .map(_._1)
        .fold[Seq[StreamMeasurement]](Seq.empty[StreamMeasurement])((seq, sm) => seq:+sm)
      .mergeSubstreams ~>  merge



    FlowShape(broadcast.in, merge.out)
  })
Run Code Online (Sandbox Code Playgroud)

在这里得到的例外情况:

Exception in thread "main" java.lang.ExceptionInInitializerError
    at xxx$.main(Processor.scala:80)
    at xxx.Processor.main(Processor.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: java.lang.UnsupportedOperationException: cannot replace the shape of the EmptyModule
    at akka.stream.impl.StreamLayout$EmptyModule$.replaceShape(StreamLayout.scala:322)
    at akka.stream.scaladsl.GraphApply$class.create(GraphApply.scala:18)
    at akka.stream.scaladsl.GraphDSL$.create(Graph.scala:801)
    at xxx.logic$$anonfun$22.apply(logic.scala:156)
    at xxx.logic$$anonfun$22.apply(logic.scala:146)
    at akka.stream.scaladsl.GraphApply$class.create(GraphApply.scala:17)
    at akka.stream.scaladsl.GraphDSL$.create(Graph.scala:801)
    at xxx.logic$.<init>(logic.scala:146)
    at xxx.logic$.<clinit>(logic.scala)
    ... 7 more
Run Code Online (Sandbox Code Playgroud)

关键问题可以在这里找到:akka-stream使用SubFlow进行压缩流程