Cec*_*ile 7 streaming scala playframework
我正在使用Scala构建一个使用Play Framework 2.3.x的微服务(我是两者中的初学者),但我无法想出一种流式传输我的请求体的方法.
这是问题所在:
我需要一个端点/transform,我可以收到一个巨大的TSV文件,我将以另一种格式解析和渲染:简单的转换.问题是我的控制器中的每个命令都"运行得太晚".它等待在启动代码之前接收完整文件.
例:
def transform = Action.async {
Future {
Logger.info("Too late")
Ok("A response")
}
}
Run Code Online (Sandbox Code Playgroud)
我希望能够在上传过程中逐行读取请求正文并处理请求,而不必等待文件完全接收.
任何提示都会受到欢迎.
Mik*_*ame 11
此答案适用于Play 2.5.x及更高版本,因为它使用的Akka流API取代了该版本中Play的基于Iteratee的流式传输.
基本上,您可以创建一个返回Source[T]可以传递给的主体解析器Ok.chunked(...).一种方法是Accumulator.source[T]在body解析器中使用.例如,刚刚返回发送给它的数据的操作可能如下所示:
def verbatimBodyParser: BodyParser[Source[ByteString, _]] = BodyParser { _ =>
// Return the source directly. We need to return
// an Accumulator[Either[Result, T]], so if we were
// handling any errors we could map to something like
// a Left(BadRequest("error")). Since we're not
// we just wrap the source in a Right(...)
Accumulator.source[ByteString]
.map(Right.apply)
}
def stream = Action(verbatimBodyParser) { implicit request =>
Ok.chunked(request.body)
}
Run Code Online (Sandbox Code Playgroud)
如果你想做一些像转换TSV文件那样你可以使用a Flow来转换源,例如:
val tsvToCsv: BodyParser[Source[ByteString, _]] = BodyParser { req =>
val transformFlow: Flow[ByteString, ByteString, NotUsed] = Flow[ByteString]
// Chunk incoming bytes by newlines, truncating them if the lines
// are longer than 1000 bytes...
.via(Framing.delimiter(ByteString("\n"), 1000, allowTruncation = true))
// Replace tabs by commas. This is just a silly example and
// you could obviously do something more clever here...
.map(s => ByteString(s.utf8String.split('\t').mkString(",") + "\n"))
Accumulator.source[ByteString]
.map(_.via(transformFlow))
.map(Right.apply)
}
def convert = Action(tsvToCsv) { implicit request =>
Ok.chunked(request.body).as("text/csv")
}
Run Code Online (Sandbox Code Playgroud)
Play文档中的" 指导身体其他部分"部分可能会有更多灵感.
| 归档时间: |
|
| 查看次数: |
2197 次 |
| 最近记录: |