fs2.Stream 挂起两次

Som*_*ame 0 functional-programming scala scala-cats fs2 cats-effect

问题:

我想反复从fs2.Stream某些第三方库提供的一些批次中取出一些批次,因此将客户从fs2.Stream自身中抽象出来并简单地给它们F[List[Int]]在它们准备好后立即批处理。

尝试: 我尝试使用fs2.Stream::take并运行一些示例。

一世。

implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)

val r = for {
  queue <- fs2.concurrent.Queue.unbounded[IO, Int]
  stream = queue.dequeue
  _ <- fs2.Stream.range(0, 1000).covaryAll[IO, Int].evalTap(queue.enqueue1).compile.drain
  _ <- stream.take(10).compile.toList.flatTap(lst => IO(println(lst))).iterateWhile(_.nonEmpty)
} yield ()

r.unsafeRunSync()
Run Code Online (Sandbox Code Playgroud)

它打印第一批List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9),然后挂起。我预计从0到的所有批次都1000将被打印。

在这里让事情更简单一点是

二、

implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)

val r = for {
  queue <- fs2.concurrent.Queue.unbounded[IO, Int]
  stream = queue.dequeue
  _ <- fs2.Stream.range(0, 1000).covaryAll[IO, Int].evalTap(queue.enqueue1).compile.drain
  _ <- stream.take(10).compile.toList.flatTap(lst => IO(println(lst)))
  _ <- stream.take(20).compile.toList.flatTap(lst => IO(println(lst)))
} yield ()

r.unsafeRunSync()
Run Code Online (Sandbox Code Playgroud)

行为与I完全相同。打印List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)然后挂起。

题:

给定fs2.Stream[IO, Int]如何提供IO[List[Int]]在评估时迭代流提供的连续批次的效果?

Lui*_*rez 5

好吧,你不能有一个IO[List[X]]代表多个批次的,那个IO将是一个批次。

你能做的最好的事情是这样的:

def processByBatches(process: List[Int] => IO[Unit]): IO[Unit]
Run Code Online (Sandbox Code Playgroud)

也就是说,你的用户会给你一个操作来为每个批次执行,你会给他们一个 IO将阻止当前光纤的操作,直到使用该函数消耗整个流。

实现此类功能的最简单方法是:

def processByBatches(process: List[Int] => IO[Unit]): IO[Unit] =
  getStreamFromThirdParty
    .chunkN(n = ChunkSize)
    .evalMap(chunk => process(chunk.toList))
    .compile
    .drain
Run Code Online (Sandbox Code Playgroud)