有条件地使用akka流跳过流

Fal*_*rri 13 akka akka-stream akka-http

我正在使用akka流,我有一段我的图形,我需要有条件地跳过,因为流不能处理某些值.具体来说,我有一个获取字符串并发出http请求的流,但是当字符串为空时服务器无法处理这种情况.但我需要返回一个空字符串.有没有办法做到这一点,而不必通过http请求知道它会失败?我基本上有这个:

val source = Source("1", "2", "", "3", "4")
val httpRequest: Flow[String, HttpRequest, _]
val httpResponse: Flow[HttpResponse, String, _]
val flow = source.via(httpRequest).via(httpResponse)
Run Code Online (Sandbox Code Playgroud)

我唯一能想到的就是在我的httpResponse流中捕获400错误并返回默认值.但我希望能够避免因为我知道事先会失败的请求而命中服务器的开销.

Vik*_*ang 16

你可以使用flatMapConcat:

(警告:从未编译过,但你会得到它的要点)

val source = Source("1", "2", "", "3", "4")
val httpRequest: Flow[String, HttpRequest, _]
val httpResponse: Flow[HttpResponse, String, _]
val makeHttpCall: Flow[HttpRequest, HttpResponse, _]
val someHttpTransformation = httpRequest via makeHttpCall via httpResponse
val emptyStringSource = Source.single("")
val cleanerSource = source.flatMapConcat({
  case "" => emptyStringSource
  case other => Source.single(other) via someHttpTransformation
})
Run Code Online (Sandbox Code Playgroud)

  • 当我使用明确的点时,我倾向于使用括号。 (2认同)

Ram*_*gil 13

Viktor Klang的解决方案简洁而优雅.我只想用Graphs演示另一种选择.

您可以将字符串源拆分为两个流,并过滤一个流以获取有效的字符串,将另一个流过滤为无效的字符串.然后合并结果(" 跨流 ").

根据文档:

val g = RunnableGraph.fromGraph(FlowGraph.create() { implicit builder: FlowGraph.Builder[Unit] =>
  import FlowGraph.Implicits._

  val source = Source(List("1", "2", "", "3", "4"))
  val sink : Sink[String,_] = ???

  val bcast = builder.add(Broadcast[String](2))
  val merge = builder.add(Merge[String](2))

  val validReq =   Flow[String].filter(_.size > 0)
  val invalidReq = Flow[String].filter(_.size == 0)

  val httpRequest: Flow[String, HttpRequest, _] = ???
  val makeHttpCall: Flow[HttpRequest, HttpResponse, _] = ???
  val httpResponse: Flow[HttpResponse, String, _] = ???
  val someHttpTransformation = httpRequest via makeHttpCall via httpResponse

  source ~> bcast ~> validReq ~> someHttpTransformation ~> merge ~> sink
            bcast ~>      invalidReq                    ~> merge
  ClosedShape
})
Run Code Online (Sandbox Code Playgroud)

注意:此解决方案会拆分流,因此Sink可能会按照与基于输入的预期不同的顺序处理字符串值结果.

  • 是! 当您可以重新排序操作时,这是另一种选择(因为http-flow和非http并行操作) (2认同)