Akka Reactive Streams总是留下一条消息

SGH*_*HAF 3 scala akka reactive-streams akka-stream

出于某种原因,我的Akka流总是在"发出"(?)之前等待第二条消息.

这是一些演示我的问题的示例代码.

val rx = Source((1 to 100).toStream.map { t =>
  Thread.sleep(1000)
  println(s"doing $t")
  t
})
rx.runForeach(println)
Run Code Online (Sandbox Code Playgroud)

产量输出:

doing 1
doing 2
1
doing 3
2
doing 4
3
doing 5
4
doing 6
5
...
Run Code Online (Sandbox Code Playgroud)

我想要的是:

doing 1
1
doing 2
2
doing 3
3
doing 4
4
doing 5
5
doing 6
6
...
Run Code Online (Sandbox Code Playgroud)

cmb*_*ter 5

现在设置代码的方式,Source在允许开始向下游发送元素之前,您正在彻底改变它.您可以通过删除toStream代表源的数字范围来清楚地看到该行为(如@slouc所述).如果你这样做,你会看到Source在它开始响应下游需求之前先完全转变.如果你真的想运行一个Source成一个Sink,并在中间转换步骤,那么你可以尝试和结构这样的事情:

val transform =
  Flow[Int].map{ t =>
    Thread.sleep(1000)
    println(s"doing $t")
    t
  }

Source((1 to 100).toStream).
  via(transform ).
  to(Sink.foreach(println)).
  run
Run Code Online (Sandbox Code Playgroud)

如果进行了更改,那么您将获得所需的效果,即在下一个元素开始处理之前,流向下游的元素将一直处理整个流程.