Akka 组合接收器而无需访问流

Jet*_*hro 4 scala akka akka-stream

我使用的 API 接受单个 AKKA Sink 并用数据填充它:

def fillSink(sink:Sink[String, _])
Run Code Online (Sandbox Code Playgroud)

有没有一种方法,无需深入研究 akka 的深度,就可以用两个接收器而不是一个接收器来处理输出?

例如

val mySink1:Sink = ...
val mySink2:Sink = ...
//something
fillSink( bothSinks )
Run Code Online (Sandbox Code Playgroud)

fillSink如果我有权访问我可以使用的方法所使用的流flow.alsoTo(mySink1).to(mySink2),但该流不会公开。

目前唯一的解决方法是传递一个处理字符串的 Sink 并将其传递给两个 StringBuilder 来替换mySink1/mySink2,但感觉这违背了 AKKA 的初衷。如果不花几天时间学习 AKKA,我无法判断是否有办法从接收器中分割输出。

谢谢!

Leo*_*o C 5

Sink运算combine符使用提供的函数组合两个或多个接收器Int => Graph[UniformFanOutShape[T, U], NotUsed]],可能就是您正在寻找的:

def combine[T, U](first: Sink[U, _], second: Sink[U, _], rest: Sink[U, _]*)(strategy: Int => Graph[UniformFanOutShape[T, U], NotUsed]): Sink[T, NotUsed]
Run Code Online (Sandbox Code Playgroud)

一个简单的例子:

val doubleSink = Sink.foreach[Int](i => println(s"Doubler: ${i*2}"))
val tripleSink = Sink.foreach[Int](i => println(s" Triper: ${i*3}"))

val combinedSink = Sink.combine(doubleSink, tripleSink)(Broadcast[Int](_))

Source(List(1, 2, 3)).runWith(combinedSink)

// Doubler: 2
//  Triper: 3
// Doubler: 4
//  Triper: 6
// Doubler: 6
//  Triper: 9
Run Code Online (Sandbox Code Playgroud)