我正在尝试了解如何使用新akka.http库.我想向服务器发送一个http请求,并将整个响应主体作为单个String读取,以便生成一个Source[String,?].
这是迄今为止我能够制作的最佳解决方案:
def get(
modelID: String,
pool: Flow[(HttpRequest,Int),(Try[HttpResponse],Int),Http.HostConnectionPool]
): Source[String,Unit] = {
val uri = reactionsURL(modelID)
val req = HttpRequest(uri = uri)
Source.single( (req,0) )
.via( pool )
.map {
case (Success(resp),_) =>
resp.entity.dataBytes.map( _.decodeString("utf-8") )
}.flatten(FlattenStrategy.concat)
.grouped( 1024 )
.map( _.mkString )
Run Code Online (Sandbox Code Playgroud)
它似乎工作得很好(除了缺少的错误路径),但对于这样简单的任务来说它有点笨拙.有更聪明的解决方案吗?我可以避免grouped/ mkString?
我正在尝试Akka HTTP,我已经创建了一个服务,在HttpResponse中返回一个域对象的Json数组.在客户端中,我想将其转换为域对象的源,以便后续的流和接收器可以使用它.
参考Json支持部分:http: //doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/http/common/json-support.html
我已经完成了定义隐式RootJsonReader等的必要,但我不知道如何使用FromEntityUnmarshaller.
我的代码在这里:https: //github.com/charlesxucheng/akka-http-microservice
它基于akka-http-microservice激活器模板.Service2.scala是我的服务器实现,并且正在运行.AkkaHttpClient.scala是客户端实现,它不完整.
要构建,请使用Gradle作为build.sbt不是最新的.
谢谢.
是否有可能从Akka HTTP 动态反序列化未知长度的外部ByteString流到域对象?
我称一个无限长的HTTP端点输出一个JSON Array不断增长的端点:
[
{ "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
{ "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
{ "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
{ "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
{ "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
...
] <- Never sees the daylight
Run Code Online (Sandbox Code Playgroud) 如果我RunningGraph在Akka Stream 创建了一个,我怎么知道(从外面)
我正试图找出使用akka-http和akka-streams实现真正的websocket应用程序的最佳方法.我最想要的是简单性,我现在还没有得到它.
假设您有一个相当复杂的管道,需要区分多个请求,有时会将请求发送给actor进行处理,有时会发出mongo查询并返回响应,有时会在REST API上执行PUT等.
与那里的简单聊天应用程序示例不同,出现至少3个似乎没有标准解决方案的问题:
有条件地跳过响应,例如,因为客户端不期望该请求将收到响应.如果我使用从消息到消息的典型流程,一旦请求到达其目标,我需要阻止它进一步传播回websocket.它可以使用特殊的过滤器(涉及一些痛苦)或使用各种其他方式(例如,使用akka流有条件地跳过流)来完成,但这增加了许多样板和复杂性.理想情况下,我希望能够插入跳过其他所有内容的"跳过"消息.
将传入消息路由到适当的位置(例如,actor,mongo).再一次,我可以找到涉及大量样板的解决方案(例如,在不处理此类请求的分支处进行广播和过滤).理想情况下,我应该能够定义如下内容:如果消息是X,则将其发送到那里,如果消息是Y,则将其发送到那里等.
将错误传播回客户端.非常类似于上面描述的路由问题.例如,如果JSON解析失败,我需要添加一个单独的路径(广播+合并),我发送错误消息,但如果在下一阶段发生错误,我甚至无法轻易地重用相同的路径,我想将该错误传播给用户.理想情况下,我应该有一个单独的错误处理路径,可以在流中的任意点使用,完全绕过流的其余部分并返回到客户端.
目前,我有这个非常复杂的图表,跨越15行,路径经过> 20个不同的阶段,我真的很担心要保持这个解决方案的复杂性.DSL在这个尺寸上几乎是不可读的.我当然可以更好地模块化,但这对于一些应该简单得多的事情来说就像是一个疯狂的麻烦.
我错过了什么吗?考虑到akka-streams这样的任务,我是疯了吗?任何可以让我控制所有复杂性的想法或代码示例?
提前致谢!
我想使用akka-http-client链接http请求作为Stream.链中的每个http请求都取决于先前请求的成功/响应,并使用它来构造新请求.如果请求不成功,则Stream应返回不成功请求的响应.
如何在akka-http中构建这样的流?我应该使用哪个akka-http客户端级API?
是否有一种Source以特殊方式处理Akka流的第一个元素的惯用方法?我现在拥有的是:
var firstHandled = false
source.map { elem =>
if(!firstHandled) {
//handle specially
firstHandled = true
} else {
//handle normally
}
}
Run Code Online (Sandbox Code Playgroud)
谢谢
我正在寻找一种轻松重用akka-stream流的方法.
我把Flow打算重新用作函数,所以我想保留它的签名:
Flow[Input, Output, NotUsed]
现在,当我使用此流程时,我希望能够"调用"此流程并将结果保留在一边以便进一步处理.
所以我想从Flow发射开始[Input],应用我的流程,然后继续Flow发射[(Input, Output)].
例:
val s: Source[Int, NotUsed] = Source(1 to 10)
val stringIfEven = Flow[Int].filter(_ % 2 == 0).map(_.toString)
val via: Source[(Int, String), NotUsed] = ???
Run Code Online (Sandbox Code Playgroud)
现在这不可能以一种简单的方式进行,因为将流量与.via()流量相结合会使流量发光[Output]
val via: Source[String, NotUsed] = s.via(stringIfEven)
Run Code Online (Sandbox Code Playgroud)
另一种方法是使我的可重用流发出[(Input, Output)]但这需要每个流将其输入推送到所有阶段并使我的代码看起来很糟糕.
所以我想出了一个像这样的组合器:
def tupledFlow[In,Out](flow: Flow[In, Out, _]):Flow[In, (In,Out), NotUsed] = {
Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val broadcast = b.add(Broadcast[In](2))
val zip = b.add(Zip[In, Out])
broadcast.out(0) ~> zip.in0
broadcast.out(1) …Run Code Online (Sandbox Code Playgroud) 我有一个函数返回一个函数,Flow其逻辑涉及将图形的一些元素Sink传递给作为参数传递的辅助.我想保留辅助Sink的物化值,这样我就可以在构造流启动时对其值进行操作.
这是我正在建设的流程的粗略图片:
IN ~> (logic: In => Out) ~> Broadcast ~> AuxFilter ~> AuxSink
~> OutFilter ~> OUT
Run Code Online (Sandbox Code Playgroud)
示例代码:
case class Incoming()
trait Element
case class Outcoming() extends Element
case class Persistent() extends Element
def flow[Mat](auxSink: Sink[Persistent, Mat]): Flow[Incoming, Outcoming, NotUsed] = {
val isPersistent = Flow[Element].collect {
case persistent: Persistent => persistent
}
val isRunning = Flow[Element].collect {
case out: Outcoming => out
}
val magicFlow: Flow[Incoming, Element, NotUsed] = Flow[Incoming]
.map(_ => …Run Code Online (Sandbox Code Playgroud) 我有一个用例,我使用 GET 请求从 URL 下载文件。是否可以在不保存到磁盘或将整个对象保存在内存中的情况下计算文件流的 SHA256?