Som*_*ame 4 functional-programming scala scala-cats fs2
我有一个fs2.Stream可能会遇到错误的无限。我想跳过这些错误而不做任何事情(可能是日志)并继续流式传输更多元素。例子:
//An example
val stream = fs2.Stream
.awakeEvery[IO](1.second)
.evalMap(_ => IO.raiseError(new RuntimeException))
Run Code Online (Sandbox Code Playgroud)
在这种特殊情况下,我想获得无限fs2.Stream的Left(new RuntimeException)发射每一秒。
有一种Stream.attempt方法可以生成在遇到第一个错误后终止的流。有没有办法跳过错误并继续拉取更多元素?
这IO.raiseError(new RuntimeException).attempt通常不会起作用,因为它需要在流管道组合的所有位置尝试所有效果。
没有办法以您描述的方式处理错误。当流遇到第一个错误时,它被终止。请检查这个gitter 问题。
您可以通过两种方式处理它:
尝试效果(但您已经提到在您的情况下这是不可能的)。
流终止后重新启动流:
val stream: Stream[IO, Either[Throwable, Unit]] = Stream
.awakeEvery[IO](1.second)
.evalMap(_ => IO.raiseError(new RuntimeException))
.handleErrorWith(t => Stream(Left(t)) ++ stream) //put Left to the stream and restart it
//since stream will infinetelly restart I take only 3 first values
println(stream.take(3).compile.toList.unsafeRunSync())
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
457 次 |
| 最近记录: |