我目前正在尝试在运行时动态创建Akka Stream图定义.这个想法是用户将能够以交互方式定义流并将它们附加到现有/运行BroadcastHubs.这意味着我不知道在编译时将使用哪些流甚至多少流.
不幸的是,我正在努力进行泛型/类型擦除.坦率地说,我甚至不确定我在JVM上尝试做什么.
我有一个函数将返回Flow代表两个连接的Akka Streams Flows.它使用Scala TypeTags来绕过类型擦除.如果第一个流的输出类型与第二个流的输入类型相同,则可以成功连接.这很好用.
import akka.NotUsed
import akka.stream.FlowShape
import akka.stream.scaladsl.GraphDSL.Implicits._
import akka.stream.scaladsl.{Flow, GraphDSL}
import scala.reflect.runtime.universe._
import scala.util.{Failure, Success, Try}
def connect[A: TypeTag, B: TypeTag, C: TypeTag, D: TypeTag](a: Flow[A, B, NotUsed],
b: Flow[C, D, NotUsed]): Try[Flow[A, D, NotUsed]] = {
Try {
if (typeOf[B] =:= typeOf[C]) {
val c = b.asInstanceOf[Flow[B, D, NotUsed]]
Flow.fromGraph {
GraphDSL.create(a, c)((m1, m2) => NotUsed.getInstance()) { implicit b =>
(s1, s2) =>
s1 ~> …Run Code Online (Sandbox Code Playgroud)