在Play 2.5中编写BodyParser

gre*_*ghz 5 scala playframework akka-stream

给定具有此签名的函数:

def parser[A](otherParser: BodyParser[A]): BodyParser[A]
Run Code Online (Sandbox Code Playgroud)

如何在传递请求体之前检查和验证请求体的方式来编写函数otherParser

为简单起见,我想说我想验证标题("Some-Header",或许)有一个与主体完全匹配的值.所以如果我有这个动作:

def post(): Action(parser(parse.tolerantText)) { request =>
  Ok(request.body)
}
Run Code Online (Sandbox Code Playgroud)

当我提出类似请求时curl -H "Some-Header: hello" -d "hello" http://localhost:9000/post,应该在状态为200的响应正文中返回"hello".如果我的请求是curl -H "Some-Header: hello" -d "hi" http://localhost:9000/post它应该返回400没有正文.

这是我尝试过的.

这个不编译因为otherParser(request).through(flow)期望flow输出a ByteString.这里的想法是流程可以通知累加器是否继续通过Either输出进行处理.我不知道如何让累加器知道上一步的状态.

def parser[A](otherParser: BodyParser[A]): BodyParser[A] = BodyParser { request =>
  val flow: Flow[ByteString, Either[Result, ByteString], NotUsed] = Flow[ByteString].map { bytes =>
    if (request.headers.get("Some-Header").contains(bytes.utf8String)) {
      Right(bytes)
    } else {
      Left(BadRequest)
    }
  }

  val acc: Accumulator[ByteString, Either[Result, A]] = otherParser(request)

  // This fails to compile because flow needs to output a ByteString
  acc.through(flow)
}
Run Code Online (Sandbox Code Playgroud)

我也试图使用过滤器.这个编译并且写入的响应主体是正确的.但是它始终返回200 Ok响应状态.

def parser[A](otherParser: BodyParser[A]): BodyParser[A] = BodyParser { request =>
  val flow: Flow[ByteString, ByteString, akka.NotUsed] = Flow[ByteString].filter { bytes =>
    request.headers.get("Some-Header").contains(bytes.utf8String)
  }

  val acc: Accumulator[ByteString, Either[Result, A]] = otherParser(request)

  acc.through(flow)
}
Run Code Online (Sandbox Code Playgroud)

gre*_*ghz 3

我想出了一个使用GraphStageWithMaterializedValue. 这个概念是从Play 的maxLengthbody parser借用的。我在问题中的第一次尝试(无法编译)之间的主要区别在于,我不应该尝试改变流,而应该使用物化值来传达有关处理状态的信息。虽然我创建了一个,Flow[ByteString, Either[Result, ByteString], NotUsed]但事实证明我需要的是一个Flow[ByteString, ByteString, Future[Boolean]].

因此,我的parser函数最终看起来像这样:

def parser[A](otherParser: BodyParser[A]): BodyParser[A] = BodyParser { request =>
  val flow: Flow[ByteString, ByteString, Future[Boolean]] = Flow.fromGraph(new BodyValidator(request.headers.get("Some-Header")))

  val parserSink: Sink[ByteString, Future[Either[Result, A]]] = otherParser.apply(request).toSink

  Accumulator(flow.toMat(parserSink) { (statusFuture: Future[Boolean], resultFuture: Future[Either[Result, A]]) =>
    statusFuture.flatMap { success =>
      if (success) {
        resultFuture.map {
          case Left(result) => Left(result)
          case Right(a) => Right(a)
        }
      } else {
        Future.successful(Left(BadRequest))
      }
    }
  })
}
Run Code Online (Sandbox Code Playgroud)

关键是这一行:

val flow: Flow[ByteString, ByteString, Future[Boolean]] = Flow.fromGraph(new BodyValidator(request.headers.get("Some-Header")))
Run Code Online (Sandbox Code Playgroud)

一旦你能够创建这个流程,剩下的事情就迎刃而解了。不幸的BodyValidator是,它非常冗长,而且感觉有些乏味。无论如何,它大多都很容易阅读。GraphStageWithMaterializedValue期望您实现def shape: SS此处FlowShape[ByteString, ByteString])来指定该图的输入类型和输出类型。它还希望您实现def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, M)M此处是Future[Boolean])来定义图表实际应该做什么。这是完整的代码BodyValidator(我将在下面更详细地解释):

class BodyValidator(expected: Option[String]) extends GraphStageWithMaterializedValue[FlowShape[ByteString, ByteString], Future[Boolean]] {
  val in = Inlet[ByteString]("BodyValidator.in")
  val out = Outlet[ByteString]("BodyValidator.out")

  override def shape: FlowShape[ByteString, ByteString] = FlowShape.of(in, out)

  override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Boolean]) = {
    val status = Promise[Boolean]()
    val bodyBuffer = new ByteStringBuilder()

    val logic = new GraphStageLogic(shape) {
      setHandler(out, new OutHandler {
        override def onPull(): Unit = pull(in)
      })

      setHandler(in, new InHandler {
        def onPush(): Unit = {
          val chunk = grab(in)
          bodyBuffer.append(chunk)
          push(out, chunk)
        }

        override def onUpstreamFinish(): Unit = {
          val fullBody = bodyBuffer.result()
          status.success(expected.map(ByteString(_)).contains(fullBody))
          completeStage()
        }

        override def onUpstreamFailure(e: Throwable): Unit = {
          status.failure(e)
          failStage(e)
        }
      })
    }

    (logic, status.future)
  }
}
Run Code Online (Sandbox Code Playgroud)

您首先要创建一个InletOutlet设置图形的输入和输出

val in = Inlet[ByteString]("BodyValidator.in")
val out = Outlet[ByteString]("BodyValidator.out")
Run Code Online (Sandbox Code Playgroud)

然后您使用这些来定义shape.

def shape: FlowShape[ByteString, ByteString] = FlowShape.of(in, out)
Run Code Online (Sandbox Code Playgroud)

在里面createLogicAndMaterializedValue你需要初始化你想要实现的值。在这里,我使用了一个承诺,当我从流中获得完整数据时可以解决该承诺。我还创建了一个ByteStringBuilder来跟踪迭代之间的数据。

val status = Promise[Boolean]()
val bodyBuffer = new ByteStringBuilder()
Run Code Online (Sandbox Code Playgroud)

然后我创建一个GraphStageLogic来实际设置该图在每个处理点的作用。正在设置两个处理程序。一种是InHandler用于处理来自上游源的数据。另一个是OutHandler用于处理向下游发送的数据。中没有什么真正有趣的东西OutHandler,所以我在这里忽略它,除了说它是必要的样板以避免IllegalStateException. 中重写了三个方法InHandleronPushonUpstreamFinishonUpstreamFailureonPush当来自上游的新数据准备就绪时调用。在这种方法中,我只需获取下一个数据块,将其写入bodyBuffer并将数据推送到下游。

def onPush(): Unit = {
  val chunk = grab(in)
  bodyBuffer.append(chunk)
  push(out, chunk)
}
Run Code Online (Sandbox Code Playgroud)

onUpstreamFinish当上游完成时调用(惊喜)。这是将正文与标头进行比较的业务逻辑发生的地方。

override def onUpstreamFinish(): Unit = {
  val fullBody = bodyBuffer.result()
  status.success(expected.map(ByteString(_)).contains(fullBody))
  completeStage()
}
Run Code Online (Sandbox Code Playgroud)

onUpstreamFailure实施这样,当出现问题时,我也可以将物化的未来标记为失败。

override def onUpstreamFailure(e: Throwable): Unit = {
  status.failure(e)
  failStage(e)
}
Run Code Online (Sandbox Code Playgroud)

然后我只返回GraphStageLogic我创建的status.future元组。