在Akka流中创建来自Future的背压

Ale*_*ran 7 scala akka akka-stream

我是Akka流和流的新手,因此我可能在概念层面上完全误解了某些东西,但是有什么方法可以创建背压直到未来结算?基本上我想做的是这样的:

object Parser {
    def parseBytesToSeq(buffer: ByteBuffer): Seq[ExampleObject] = ???
}

val futures = FileIO.fromPath(path)
  .map(st => Parser.parseBytesToSeq(st.toByteBuffer))
  .batch(1000, x => x)(_ ++ _)
  .map(values => doAsyncOp(values))
  .runWith(Sink.seq)

def doAsyncOp(Seq[ExampleObject]) : Future[Any] = ???
Run Code Online (Sandbox Code Playgroud)

字节被从文件中读取并且传输到分析器,其发射SeqExampleObjects和那些被流式传输到一个返回一个异步操作Future.我想做到这一点,直到Future结算,流的其余部分得到反压,然后一旦Future解决后再恢复,再传递Seq[ExampleObject]doAsyncOp,恢复背压等等.

现在我有这个工作:

Await.result(doAsyncOp(values), 10 seconds)
Run Code Online (Sandbox Code Playgroud)

但我的理解是,这会锁定整个线程并且很糟糕.有没有更好的方法呢?

如果它有帮助,那么大局是我正在尝试用Jawn解析一个非常大的JSON文件(太大而不适合内存),然后将对象传递给ElasticSearch,以便在它们被解析时被编入索引 - ElasticSearch有一个包含50个待处理操作的队列,如果溢出则开始拒绝新对象.

exp*_*ert 10

这很容易.你需要用mapAync:)

val futures = FileIO.fromPath(path)
  .map(st => Parser.parseBytesToSeq(st.toByteBuffer))
  .batch(1000, x => x)(_ ++ _)
  .mapAsync(4)(values => doAsyncOp(values))
  .runWith(Sink.seq)
Run Code Online (Sandbox Code Playgroud)

哪里4是平行度.