为什么Source.fromIterator需要一个Function0 [Iterator [T]]作为参数而不是Iterator [T]?

joh*_*ohn 5 scala akka-stream

基于:源代码

我不明白为什么参数Source.fromIteratorFunction0[Iterator[T]]而不是Iterator[T].

这有一个实际的原因吗?我们可以def fromIterator(iterator: => Iterator[T])改为签名吗?(为了避免这样做Source.fromIterator( () => myIterator))

Mik*_*ame 10

根据文档:

迭代器将为每个实现重新创建,这就是该方法直接使用函数而不是迭代器的原因.

流阶段应该是可重复使用的,因此您可以实现多个阶段.但是,给定的迭代器(通常)只能被消耗一次.如果fromIterator创建了一个引用现有迭代器的Source(无论是通过名称还是引用传递),那么实现它的第二次尝试可能会失败,因为底层迭代器将会耗尽.

为了解决这个问题,源需要能够实例化一个新的迭代器,因此fromIterator允许您提供必要的逻辑来作为供应商函数.

这是我们不想发生的事情的一个例子:

implicit val system = akka.actor.ActorSystem.create("test")
implicit val mat = akka.stream.ActorMaterializer(system)

val iter = Iterator.range(0, 2)
// pretend we pass the iterator directly...
val src = Source.fromIterator(() => iter)

Await.result(src.runForEach(println), 2.seconds)
// 0
// 1
// res0: akka.Done = Done

Await.result(src.runForEach(println), 2.seconds)
// res1: akka.Done = Done
// No results???
Run Code Online (Sandbox Code Playgroud)

这很糟糕,因为Source src不可重复使用,因为它在后续运行中不会提供相同的输出.但是,如果我们懒惰地创建迭代器,它可以工作:

val iterFunc = () => Iterator.range(0, 2)
val src = Source.fromIterator(iterFunc)

Await.result(src.runForEach(println), 2.seconds)
// 0
// 1
// res0: akka.Done = Done

Await.result(src.runForEach(println), 2.seconds)
// 0
// 1
// res1: akka.Done = Done
Run Code Online (Sandbox Code Playgroud)