标签: akka-stream

使用Akka-Streams HTTP将整个HttpResponse主体作为String获取

我正在尝试了解如何使用新akka.http库.我想向服务器发送一个http请求,并将整个响应主体作为单个String读取,以便生成一个Source[String,?].

这是迄今为止我能够制作的最佳解决方案:

 def get(
   modelID: String,
   pool: Flow[(HttpRequest,Int),(Try[HttpResponse],Int),Http.HostConnectionPool]
 ): Source[String,Unit] = {
   val uri = reactionsURL(modelID)
   val req = HttpRequest(uri = uri)
   Source.single( (req,0) )
     .via( pool )
     .map { 
       case (Success(resp),_) =>
         resp.entity.dataBytes.map( _.decodeString("utf-8") )
     }.flatten(FlattenStrategy.concat)
     .grouped( 1024 )
     .map( _.mkString )
Run Code Online (Sandbox Code Playgroud)

它似乎工作得很好(除了缺少的错误路径),但对于这样简单的任务来说它有点笨拙.有更聪明的解决方案吗?我可以避免grouped/ mkString

scala http akka akka-stream

8
推荐指数
2
解决办法
9304
查看次数

Akka HTTP:如何将Json格式响应解组为域对象

我正在尝试Akka HTTP,我已经创建了一个服务,在HttpResponse中返回一个域对象的Json数组.在客户端中,我想将其转换为域对象的源,以便后续的流和接收器可以使用它.

参考Json支持部分:http: //doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/http/common/json-support.html

我已经完成了定义隐式RootJsonReader等的必要,但我不知道如何使用FromEntityUnmarshaller.

我的代码在这里:https: //github.com/charlesxucheng/akka-http-microservice

它基于akka-http-microservice激活器模板.Service2.scala是我的服务器实现,并且正在运行.AkkaHttpClient.scala是客户端实现,它不完整.

要构建,请使用Gradle作为build.sbt不是最新的.

谢谢.

json scala reactive-streams akka-stream akka-http

8
推荐指数
1
解决办法
1306
查看次数

Akka HTTP Streaming JSON反序列化

是否有可能从Akka HTTP 动态反序列化未知长度的外部ByteString流到域对象?


上下文

我称一个无限长的HTTP端点输出一个JSON Array不断增长的端点:

[
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
    ...
] <- Never sees the daylight
Run Code Online (Sandbox Code Playgroud)

json akka akka-stream akka-http

8
推荐指数
1
解决办法
1375
查看次数

监控已关闭的图形Akka Stream

如果我RunningGraph在Akka Stream 创建了一个,我怎么知道(从外面)

  1. 当所有节点因完成而被取消时?
  2. 当所有节点因错误而停止时?

scala akka-stream

8
推荐指数
1
解决办法
3981
查看次数

Akka Streams Websocket接线

我正试图找出使用akka-http和akka-streams实现真正的websocket应用程序的最佳方法.我最想要的是简单性,我现在还没有得到它.

假设您有一个相当复杂的管道,需要区分多个请求,有时会将请求发送给actor进行处理,有时会发出mongo查询并返回响应,有时会在REST API上执行PUT等.

与那里的简单聊天应用程序示例不同,出现至少3个似乎没有标准解决方案的问题:

  • 有条件地跳过响应,例如,因为客户端不期望该请求将收到响应.如果我使用从消息到消息的典型流程,一旦请求到达其目标,我需要阻止它进一步传播回websocket.它可以使用特殊的过滤器(涉及一些痛苦)或使用各种其他方式(例如,使用akka流有条件地跳过流)来完成,但这增加了许多样板和复杂性.理想情况下,我希望能够插入跳过其他所有内容的"跳过"消息.

  • 将传入消息路由到适当的位置(例如,actor,mongo).再一次,我可以找到涉及大量样板的解决方案(例如,在不处理此类请求的分支处进行广播和过滤).理想情况下,我应该能够定义如下内容:如果消息是X,则将其发送到那里,如果消息是Y,则将其发送到那里等.

  • 将错误传播回客户端.非常类似于上面描述的路由问题.例如,如果JSON解析失败,我需要添加一个单独的路径(广播+合并),我发送错误消息,但如果在下一阶段发生错误,我甚至无法轻易地重用相同的路径,我想将该错误传播给用户.理想情况下,我应该有一个单独的错误处理路径,可以在流中的任意点使用,完全绕过流的其余部分并返回到客户端.

目前,我有这个非常复杂的图表,跨越15行,路径经过> 20个不同的阶段,我真的很担心要保持这个解决方案的复杂性.DSL在这个尺寸上几乎是不可读的.我当然可以更好地模块化,但这对于一些应该简单得多的事情来说就像是一个疯狂的麻烦.

我错过了什么吗?考虑到akka-streams这样的任务,我是疯了吗?任何可以让我控制所有复杂性的想法或代码示例?

提前致谢!

modularity scala websocket akka akka-stream

8
推荐指数
1
解决办法
370
查看次数

链中的Akka-http-client链请求

我想使用akka-http-client链接http请求作为Stream.链中的每个http请求都取决于先前请求的成功/响应,并使用它来构造新请求.如果请求不成功,则Stream应返回不成功请求的响应.

如何在akka-http中构建这样的流?我应该使用哪个akka-http客户端级API?

scala akka-stream akka-http

8
推荐指数
1
解决办法
1916
查看次数

特别处理Akka流的第一个元素

是否有一种Source以特殊方式处理Akka流的第一个元素的惯用方法?我现在拥有的是:

    var firstHandled = false
    source.map { elem =>
      if(!firstHandled) {
        //handle specially
        firstHandled = true
      } else {
        //handle normally
      }
    }
Run Code Online (Sandbox Code Playgroud)

谢谢

scala akka akka-stream

8
推荐指数
1
解决办法
1382
查看次数

重用akka-stream流的优雅方式

我正在寻找一种轻松重用akka-stream流的方法.

我把Flow打算重新用作函数,所以我想保留它的签名:

Flow[Input, Output, NotUsed]

现在,当我使用此流程时,我希望能够"调用"此流程并将结果保留在一边以便进一步处理.

所以我想从Flow发射开始[Input],应用我的流程,然后继续Flow发射[(Input, Output)].

例:

val s: Source[Int, NotUsed] = Source(1 to 10)

val stringIfEven = Flow[Int].filter(_ % 2 == 0).map(_.toString)

val via: Source[(Int, String), NotUsed] = ???
Run Code Online (Sandbox Code Playgroud)

现在这不可能以一种简单的方式进行,因为将流量与.via()流量相结合会使流量发光[Output]

val via: Source[String, NotUsed] = s.via(stringIfEven)
Run Code Online (Sandbox Code Playgroud)

另一种方法是使我的可重用流发出[(Input, Output)]但这需要每个流将其输入推送到所有阶段并使我的代码看起来很糟糕.

所以我想出了一个像这样的组合器:

def tupledFlow[In,Out](flow: Flow[In, Out, _]):Flow[In, (In,Out), NotUsed] = {
  Flow.fromGraph(GraphDSL.create() { implicit b =>
  import GraphDSL.Implicits._

  val broadcast = b.add(Broadcast[In](2))
  val zip = b.add(Zip[In, Out])

  broadcast.out(0) ~> zip.in0
  broadcast.out(1) …
Run Code Online (Sandbox Code Playgroud)

scala akka akka-stream

8
推荐指数
1
解决办法
970
查看次数

Akka Streams - 如何在图形中保持辅助接收器的物化值

我有一个函数返回一个函数,Flow其逻辑涉及将图形的一些元素Sink传递给作为参数传递的辅助.我想保留辅助Sink的物化值,这样我就可以在构造流启动时对其值进行操作.

这是我正在建设的流程的粗略图片:

IN ~> (logic: In => Out) ~> Broadcast ~> AuxFilter ~> AuxSink
                                      ~> OutFilter ~> OUT
Run Code Online (Sandbox Code Playgroud)

示例代码:

case class Incoming()
trait Element
case class Outcoming() extends Element
case class Persistent() extends Element

def flow[Mat](auxSink: Sink[Persistent, Mat]): Flow[Incoming, Outcoming, NotUsed] = {
  val isPersistent = Flow[Element].collect {
    case persistent: Persistent => persistent
  }

  val isRunning = Flow[Element].collect {
    case out: Outcoming => out
  }

  val magicFlow: Flow[Incoming, Element, NotUsed] = Flow[Incoming]
    .map(_ => …
Run Code Online (Sandbox Code Playgroud)

scala akka akka-stream

7
推荐指数
1
解决办法
1535
查看次数

数据流的 SHA256

我有一个用例,我使用 GET 请求从 URL 下载文件。是否可以在不保存到磁盘或将整个对象保存在内存中的情况下计算文件流的 SHA256?

scala akka-stream akka-http

7
推荐指数
1
解决办法
1587
查看次数