用akka-stream过滤异步

nde*_*rge 2 akka-stream

是否有与该mapAsync()方法等效的方法,但适用于filter

这是使用伪代码的示例:

val filter: T => Future[Boolean] = /.../

source.filter(filter).runWith(/.../)
       ^^^^^^
Run Code Online (Sandbox Code Playgroud)

谢谢

Ram*_*gil 5

我认为没有直接的方法FlowSource该方法具有您要寻找的功能,但是将可用方法结合使用将为您带来所需的结果:

def asyncFilter[T](filter: T => Future[Boolean], parallelism : Int = 1)
                  (implicit ec : ExecutionContext) : Flow[T, T, _] =
  Flow[T].mapAsync(parallelism)(t => filter(t).map(_ -> t))
         .filter(_._1)
         .map(_._2)
Run Code Online (Sandbox Code Playgroud)