Ste*_*ods 13 scala type-erasure akka shapeless akka-stream
我目前正在尝试在运行时动态创建Akka Stream图定义.这个想法是用户将能够以交互方式定义流并将它们附加到现有/运行BroadcastHubs.这意味着我不知道在编译时将使用哪些流甚至多少流.
不幸的是,我正在努力进行泛型/类型擦除.坦率地说,我甚至不确定我在JVM上尝试做什么.
我有一个函数将返回Flow代表两个连接的Akka Streams Flows.它使用Scala TypeTags来绕过类型擦除.如果第一个流的输出类型与第二个流的输入类型相同,则可以成功连接.这很好用.
import akka.NotUsed
import akka.stream.FlowShape
import akka.stream.scaladsl.GraphDSL.Implicits._
import akka.stream.scaladsl.{Flow, GraphDSL}
import scala.reflect.runtime.universe._
import scala.util.{Failure, Success, Try}
def connect[A: TypeTag, B: TypeTag, C: TypeTag, D: TypeTag](a: Flow[A, B, NotUsed],
b: Flow[C, D, NotUsed]): Try[Flow[A, D, NotUsed]] = {
Try {
if (typeOf[B] =:= typeOf[C]) {
val c = b.asInstanceOf[Flow[B, D, NotUsed]]
Flow.fromGraph {
GraphDSL.create(a, c)((m1, m2) => NotUsed.getInstance()) { implicit b =>
(s1, s2) =>
s1 ~> s2
FlowShape(s1.in, s2.out)
}
}
}
else
throw new RuntimeException(s"Connection failed. Incompatible types: ${typeOf[B]} and ${typeOf[C]}")
}
}
Run Code Online (Sandbox Code Playgroud)
所以,如果我有Flow[A,B]和Flow[C,D],结果将是Flow[A,D]假设B和C是同一类型.
我也有尝试合并功能/减少List的Flows到一个单一的Flow.让我们假设此列表是从文件或Web请求的流定义列表派生的.
def merge(fcs: List[Flow[_, _, NotUsed]]): Try[Option[Flow[_, _, NotUsed]]] = {
fcs match {
case Nil => Success(None)
case h :: Nil => Success(Some(h))
case h :: t =>
val n = t.head
connect(h, n) match {
case Success(fc) => merge(fc :: t)
case Failure(e) => Failure(e)
}
}
}
Run Code Online (Sandbox Code Playgroud)
不幸的是,由于Flows存储在一个内部List,由于标准上的类型擦除Lists,我丢失了所有的类型信息,因此无法Flows在运行时连接.这是一个例子:
def flowIdentity[A]() = Flow.fromFunction[A, A](x => x)
def flowI2S() = Flow.fromFunction[Int, String](_.toString)
val a = flowIdentity[Int]()
val b = flowIdentity[Int]()
val c = flowI2S()
val d = flowIdentity[String]()
val fcs: List[Flow[_, _, NotUsed]] = List(a, b, c, d)
val y = merge(fcs)
Run Code Online (Sandbox Code Playgroud)
这导致例外:
Failure(java.lang.RuntimeException: Connection failed. Incompatible types _$4 and _$3)
Run Code Online (Sandbox Code Playgroud)
我一直在研究Miles Sabin的Shapeless,并认为我可以使用它HLists来保留类型信息.不幸的是,这似乎只有在编译时我知道列表的各个类型和长度时才有效.如果我上溯造型特定的HList只是HList,它看起来像我再次失去类型的信息.
val fcs: HList = a :: b :: c :: d :: HNil
Run Code Online (Sandbox Code Playgroud)
所以我的问题是......这甚至可能吗?有没有办法用Shapeless generics魔术做到这一点(最好不需要使用特定的非存在类型提取器)?我想尽可能找到通用的解决方案,任何帮助都将不胜感激.
谢谢!
Tom*_*tah -1
正如您已经注意到的,它不起作用的原因是该列表删除了您拥有的类型。因此这是不可能的。如果您知道所有可用作中间类型的类型,则可以通过添加解析函数来解决该问题。添加这样的功能也将简化您的连接方法。我将添加一个代码片段。我希望一切都清楚。
def flowIdentity[A]() = Flow.fromFunction[A, A](x => x)
def flowI2S() = Flow.fromFunction[Int, String](_.toString)
def main(args: Array[String]): Unit = {
val idInt1 = flowIdentity[Int]()
val idInt2 = flowIdentity[Int]()
val int2String = flowI2S()
val idString = flowIdentity[String]()
val fcs = List(idInt1, idInt2, int2String, idString)
val source = Source(1 to 10)
val mergedGraph = merge(fcs).get.asInstanceOf[Flow[Int, String, NotUsed]]
source.via(mergedGraph).to(Sink.foreach(println)).run()
}
def merge(fcs: List[Flow[_, _, NotUsed]]): Option[Flow[_, _, NotUsed]] = {
fcs match {
case Nil => None
case h :: Nil => Some(h)
case h :: t =>
val n = t.head
val fc = resolveConnect(h, n)
merge(fc :: t.tail)
}
}
def resolveConnect(a: Flow[_, _, NotUsed], b: Flow[_, _, NotUsed]): Flow[_, _, NotUsed] = {
if (a.isInstanceOf[Flow[_, Int, NotUsed]] && b.isInstanceOf[Flow[Int, _, NotUsed]]) {
connectInt(a.asInstanceOf[Flow[_, Int, NotUsed]], b.asInstanceOf[Flow[Int, _, NotUsed]])
} else if (a.isInstanceOf[Flow[_, String, NotUsed]] && b.isInstanceOf[Flow[String, _, NotUsed]]) {
connectString(a.asInstanceOf[Flow[_, String, NotUsed]], b.asInstanceOf[Flow[String, _, NotUsed]])
} else {
throw new UnsupportedOperationException
}
}
def connectInt(a: Flow[_, Int, NotUsed], b: Flow[Int, _, NotUsed]): Flow[_, _, NotUsed] = {
a.via(b)
}
def connectString(a: Flow[_, String, NotUsed], b: Flow[String, _, NotUsed]): Flow[_, _, NotUsed] = {
a.via(b)
}
Run Code Online (Sandbox Code Playgroud)
附注
还有另一个错误隐藏在那里,无限循环。当调用合并递归时,应该删除第一个元素,因为它已经合并到主流程中。
| 归档时间: |
|
| 查看次数: |
569 次 |
| 最近记录: |