FS2:是否可以正常完成Queue?

Pau*_*sak 3 scala fs2

假设我想将一些旧的异步API转换为FS2流。该API提供了一个包含3个回调的接口:下一个元素,成功,错误。我希望Stream发出所有元素,然后在收到成功或错误回调后完成。

FS2指南(https://functional-streams-for-scala.github.io/fs2/guide.html)建议fs2.Queue在这种情况下使用,并且对入队非常有用,但是到目前为止我看到的所有示例都希望queue.dequeue返回的流将永远不会完成-在我的情况下,没有明显的方法可以处理成功/错误回调。我尝试使用queue.dequeue.interruptWhen(...here goes the signal...),但是如果成功/错误回调在客户端从流读取数据之前到达,则流将过早终止-仍有未读元素。我希望消费者在完成流之前先阅读它们。

FS2是否可以做到这一点?对于Akka Streams来说,它是微不足道的- SourceQueueWithComplete具有completefail方法。

更新:通过在Option中包装元素并考虑将None作为停止读取流的信号,以及通过使用Promise传播错误,我能够获得足够好的结果:

queue.dequeue
.interruptWhen(interruptingPromise.get)
.takeWhile(_.isDefined).map(_.get)
Run Code Online (Sandbox Code Playgroud)

但是,我是否忽略了更自然的方法?

Dae*_*yth 5

一种惯用的方法是创建一个,Queue[Option[A]]而不是Queue[A]。入队时,请包裹入,Some您可以显式入队None以完成信号。在出队方面,do q.dequeue.unNoneTerminate会为您提供一个Stream[F, A]在队列发出后终止的None


adh*_*nem 5

更新的答案:unNoneTerminate与结合使用rethrow,当它引发可投掷的东西时,它取a Stream[F, Either[Throwable, A]]并返回Stream[F, A]错误Stream.raiseError

然后,您的完整堆栈将为a Stream[F, Either[Throwable, Option[A]]],您可以Stream[F,A]通过调用将其展开.rethrow.unNoneTerminate