noo*_*ner 1 java scala akka akka-stream
我有一个过滤器来过滤掉流中不正确的项目。在某些边缘情况下,这可能会导致所有项目被过滤掉。发生这种情况时,流会
java.util.NoSuchElementException: reduce over empty stream因运行reduce时出现错误而失败。
如何处理这种情况以返回有意义的响应?
我尝试过像这样的监督 -
val decider: Supervision.Decider = {
case _ => Supervision.Stop
case e : NoSuchElementException => Supervision.stop
}
RunnableGraph.
toMat(Sink.reduce[Int](_ + _)
.withAttributes(ActorAttributes.supervisionStrategy(decider)))(Keep.both)
.run()
Run Code Online (Sandbox Code Playgroud)
我也尝试过recover,但似乎没有任何效果。
我需要处理这个案例才能返回有意义的响应。
任何帮助将不胜感激。
可能值得考虑使用Sink.fold而不是Sink.reduce:
val possiblyEmpty = Source(Seq(1, 3, 5)).filter(_ % 2 == 0)
val eventualInt = possiblyEmpty.toMat(Sink.fold[Int](0)(_ + _))(Keep.right).run()
Run Code Online (Sandbox Code Playgroud)
如果没有合理的零/恒等元素,您可以按照以下方式进行更概括的处理:
def reducePossiblyEmpty[T](source: Source[T])(f: (T, T) => T): RunnableGraph[Future[Option[T]]] = {
val lifted = { (x: Option[T], y: Option[T]) =>
x.flatMap(a => y.map(f))
}
source.map(x => Some(x))
.concat(Source.single(None))
.statefulMapConcat[Option[T]] { () =>
var emptyStream = true
{ x =>
x match {
case Some(x) =>
// element from the given stream
emptyStream = false
List(x)
case None =>
// given stream completed
if (emptyStream) {
List(x)
} else {
Nil // don't emit anything
}
}
}
}
.toMat(Sink.reduce[Option[T]](lifted))(Keep.right)
}
Run Code Online (Sandbox Code Playgroud)
None如果没有元素,则返回的图将完成,或者Some是归约结果的 a。
编辑:您也可以只使用/orElse来代替上面的:SourceFlow.concat.statefulMapConcat
def reducePossiblyEmpty[T](source: Source[T])(f: (T, T) => T): RunnableGraph[Future[Option[T]]] = {
val lifted = { (x: Option[T], y: Option[T]) =>
x.flatMap(a => y.map(f))
}
source.map(x => Some(x))
.orElse(Source.single(None))
.toMat(Sink.reduce[Option[T]](lifted))(Keep.right)
}
Run Code Online (Sandbox Code Playgroud)