我使用graph dsl根据我看到的一些示例代码创建了一些流处理作业.一切都运行得很好,我只是很难理解符号:(更新为2.4)
def elements: Source[Foos] = ...
def logEveryNSink = // a sink that logs
def cleaner: Flow[Foos, Bars, Unit] = ...
def boolChecker(bar: Bar)(implicit ex: ExecutionContext): Future[Boolean] = ...
val mySink = Sink.foreach[Boolean](println(_))
val lastly = Flow[Bars].mapAsync(2)(x => boolChecker(x).toMat(mySink)(Keep.right)
val materialized = RunnableGraph.fromGraph(
GraphDSL.create(lastly) { implicit builder =>
baz => {
import GraphDSL.Implicits._
val broadcast1 = builder.add(Broadcast[Foos](2))
val broadcast2 = builder.add(Broadcast[Bars](2))
elements ~> broadcast1 ~> logEveryNSink(1)
broadcast1 ~> cleaner ~> broadcast2 ~> baz
~> broadcast2 ~> logEveryNSink(1)
ClosedShape
}
}
).run()
Run Code Online (Sandbox Code Playgroud)
我理解包含的隐式构建器,但我不确定它baz代表什么{ implicit builder => baz => { ....它只是整个形状的隐含名称吗?
该GraphDSL.create方法严重超载以获取大量输入形状(包括0)的变体.如果没有传入初始形状,那么buildBlock函数arg 的签名(实际定义图形构建方式的主体)如下:
(Builder[NotUsed]) => S
Run Code Online (Sandbox Code Playgroud)
所以这只是一个Function1[Builder[NotUsed], S]函数,它接受一个实例Builder[NotUsed]并返回一个Shape最终图形的实例.这NotUsed就是同义词,Unit因为你没有传入任何你不关心正在生成的输出图的具体化值的输入共享.
如果您决定传入输入形状,那么该buildBlock函数的签名会稍微改变以适应输入形状.在您的情况下,您传入1个输入形状,因此buildBlock更改的签名为:
(Builder[Mat]) => Graph.Shape => S
Run Code Online (Sandbox Code Playgroud)
现在,这本质上是一个Function1[Builder[Mat], Function1[Graph.Shape, S]]或者一个函数,它接受一个Builder[Mat](其中Mat是输入形状的具体化值类型)并返回一个函数,该函数接受Graph.Shape并返回一个实例S(即a Shape).
简而言之,如果你传入形状,那么你还需要在图形构建块函数中将它们声明为绑定参数,但作为第二个输入函数(因此是附加的=>).