在运行时动态创建Akka流流

Ste*_*ods 13 scala type-erasure akka shapeless akka-stream

我目前正在尝试在运行时动态创建Akka Stream图定义.这个想法是用户将能够以交互方式定义流并将它们附加到现有/运行BroadcastHubs.这意味着我不知道在编译时将使用哪些流甚至多少流.

不幸的是,我正在努力进行泛型/类型擦除.坦率地说,我甚至不确定我在JVM上尝试做什么.

我有一个函数将返回Flow代表两个连接的Akka Streams Flows.它使用Scala TypeTags来绕过类型擦除.如果第一个流的输出类型与第二个流的输入类型相同,则可以成功连接.这很好用.

import akka.NotUsed
import akka.stream.FlowShape
import akka.stream.scaladsl.GraphDSL.Implicits._
import akka.stream.scaladsl.{Flow, GraphDSL}

import scala.reflect.runtime.universe._
import scala.util.{Failure, Success, Try}

def connect[A: TypeTag, B: TypeTag, C: TypeTag, D: TypeTag](a: Flow[A, B, NotUsed],
                                                            b: Flow[C, D, NotUsed]): Try[Flow[A, D, NotUsed]] = {
  Try {
    if (typeOf[B] =:= typeOf[C]) {
      val c = b.asInstanceOf[Flow[B, D, NotUsed]]

      Flow.fromGraph {
        GraphDSL.create(a, c)((m1, m2) => NotUsed.getInstance()) { implicit b =>
          (s1, s2) =>
            s1 ~> s2
            FlowShape(s1.in, s2.out)
        }
      }
    }
    else
      throw new RuntimeException(s"Connection failed. Incompatible types: ${typeOf[B]} and ${typeOf[C]}")
  }
}
Run Code Online (Sandbox Code Playgroud)

所以,如果我有Flow[A,B]Flow[C,D],结果将是Flow[A,D]假设B和C是同一类型.

我也有尝试合并功能/减少ListFlows到一个单一的Flow.让我们假设此列表是从文件或Web请求的流定义列表派生的.

def merge(fcs: List[Flow[_, _, NotUsed]]): Try[Option[Flow[_, _, NotUsed]]] = {
  fcs match {
    case Nil => Success(None)
    case h :: Nil => Success(Some(h))
    case h :: t =>
      val n = t.head

      connect(h, n) match {
        case Success(fc) => merge(fc :: t)
        case Failure(e) => Failure(e)
      }
  }
}
Run Code Online (Sandbox Code Playgroud)

不幸的是,由于Flows存储在一个内部List,由于标准上的类型擦除Lists,我丢失了所有的类型信息,因此无法Flows在运行时连接.这是一个例子:

def flowIdentity[A]() = Flow.fromFunction[A, A](x => x)

def flowI2S() = Flow.fromFunction[Int, String](_.toString)

val a = flowIdentity[Int]()
val b = flowIdentity[Int]()
val c = flowI2S()
val d = flowIdentity[String]()

val fcs: List[Flow[_, _, NotUsed]] = List(a, b, c, d)

val y = merge(fcs)
Run Code Online (Sandbox Code Playgroud)

这导致例外:

Failure(java.lang.RuntimeException: Connection failed. Incompatible types _$4 and _$3)
Run Code Online (Sandbox Code Playgroud)

我一直在研究Miles Sabin的Shapeless,并认为我可以使用它HLists来保留类型信息.不幸的是,这似乎只有在编译时我知道列表的各个类型和长度时才有效.如果我上溯造型特定的HList只是HList,它看起来像我再次失去类型的信息.

val fcs: HList = a :: b :: c :: d :: HNil
Run Code Online (Sandbox Code Playgroud)

所以我的问题是......这甚至可能吗?有没有办法用Shapeless generics魔术做到这一点(最好不需要使用特定的非存在类型提取器)?我想尽可能找到通用的解决方案,任何帮助都将不胜感激.

谢谢!

Tom*_*tah -1

正如您已经注意到的,它不起作用的原因是该列表删除了您拥有的类型。因此这是不可能的。如果您知道所有可用作中间类型的类型,则可以通过添加解析函数来解决该问题。添加这样的功能也将简化您的连接方法。我将添加一个代码片段。我希望一切都清楚。

def flowIdentity[A]() = Flow.fromFunction[A, A](x => x)
def flowI2S() = Flow.fromFunction[Int, String](_.toString)

def main(args: Array[String]): Unit = {
    val idInt1 = flowIdentity[Int]()
    val idInt2 = flowIdentity[Int]()
    val int2String = flowI2S()
    val idString = flowIdentity[String]()
    val fcs = List(idInt1, idInt2, int2String, idString)

    val source = Source(1 to 10)
    val mergedGraph = merge(fcs).get.asInstanceOf[Flow[Int, String, NotUsed]]
    source.via(mergedGraph).to(Sink.foreach(println)).run()
}

def merge(fcs: List[Flow[_, _, NotUsed]]): Option[Flow[_, _, NotUsed]] = {
    fcs match {
      case Nil => None
      case h :: Nil => Some(h)
      case h :: t =>
        val n = t.head

        val fc = resolveConnect(h, n)
        merge(fc :: t.tail)
    }
}

def resolveConnect(a: Flow[_, _, NotUsed], b: Flow[_, _, NotUsed]): Flow[_, _, NotUsed] = {
    if (a.isInstanceOf[Flow[_, Int, NotUsed]] && b.isInstanceOf[Flow[Int, _, NotUsed]]) {
      connectInt(a.asInstanceOf[Flow[_, Int, NotUsed]], b.asInstanceOf[Flow[Int, _, NotUsed]])
    } else if (a.isInstanceOf[Flow[_, String, NotUsed]] && b.isInstanceOf[Flow[String, _, NotUsed]]) {
      connectString(a.asInstanceOf[Flow[_, String, NotUsed]], b.asInstanceOf[Flow[String, _, NotUsed]])
    } else {
      throw new UnsupportedOperationException
    }
}

def connectInt(a: Flow[_, Int, NotUsed], b: Flow[Int, _, NotUsed]): Flow[_, _, NotUsed] = {
    a.via(b)
}

def connectString(a: Flow[_, String, NotUsed], b: Flow[String, _, NotUsed]): Flow[_, _, NotUsed] = {
   a.via(b)
}
Run Code Online (Sandbox Code Playgroud)

附注

还有另一个错误隐藏在那里,无限循环。当调用合并递归时,应该删除第一个元素,因为它已经合并到主流程中。