如何处理java.util.NoSuchElementException:减少空流akka流?

noo*_*ner 1 java scala akka akka-stream

我有一个过滤器来过滤掉流中不正确的项目。在某些边缘情况下,这可能会导致所有项目被过滤掉。发生这种情况时,流会 java.util.NoSuchElementException: reduce over empty stream因运行reduce时出现错误而失败。

如何处理这种情况以返回有意义的响应?

我尝试过像这样的监督 -

val decider: Supervision.Decider = {
    case _                      => Supervision.Stop
    case e : NoSuchElementException => Supervision.stop
  }

RunnableGraph.
toMat(Sink.reduce[Int](_ + _)
.withAttributes(ActorAttributes.supervisionStrategy(decider)))(Keep.both)
.run()
Run Code Online (Sandbox Code Playgroud)

我也尝试过recover,但似乎没有任何效果。

我需要处理这个案例才能返回有意义的响应。

任何帮助将不胜感激。

Lev*_*sey 5

可能值得考虑使用Sink.fold而不是Sink.reduce

val possiblyEmpty = Source(Seq(1, 3, 5)).filter(_ % 2 == 0)
val eventualInt = possiblyEmpty.toMat(Sink.fold[Int](0)(_ + _))(Keep.right).run()
Run Code Online (Sandbox Code Playgroud)

如果没有合理的零/恒等元素,您可以按照以下方式进行更概括的处理:

def reducePossiblyEmpty[T](source: Source[T])(f: (T, T) => T): RunnableGraph[Future[Option[T]]] = {
  val lifted = { (x: Option[T], y: Option[T]) =>
    x.flatMap(a => y.map(f))
  }
  source.map(x => Some(x))
    .concat(Source.single(None))
    .statefulMapConcat[Option[T]] { () =>
      var emptyStream = true
      { x =>
        x match {
          case Some(x) =>
            // element from the given stream
            emptyStream = false
            List(x)

          case None =>
            // given stream completed
            if (emptyStream) {
              List(x)
            } else {
              Nil  // don't emit anything
            }
        }
      }
    }
    .toMat(Sink.reduce[Option[T]](lifted))(Keep.right)
}
Run Code Online (Sandbox Code Playgroud)

None如果没有元素,则返回的图将完成,或者Some是归约结果的 a。

编辑:您也可以只使用/orElse来代替上面的:SourceFlow.concat.statefulMapConcat

def reducePossiblyEmpty[T](source: Source[T])(f: (T, T) => T): RunnableGraph[Future[Option[T]]] = {
  val lifted = { (x: Option[T], y: Option[T]) =>
    x.flatMap(a => y.map(f))
  }

  source.map(x => Some(x))
    .orElse(Source.single(None))
    .toMat(Sink.reduce[Option[T]](lifted))(Keep.right)
}
Run Code Online (Sandbox Code Playgroud)