Akka Streams Websocket接线

use*_*159 8 modularity scala websocket akka akka-stream

我正试图找出使用akka-http和akka-streams实现真正的websocket应用程序的最佳方法.我最想要的是简单性,我现在还没有得到它.

假设您有一个相当复杂的管道,需要区分多个请求,有时会将请求发送给actor进行处理,有时会发出mongo查询并返回响应,有时会在REST API上执行PUT等.

与那里的简单聊天应用程序示例不同,出现至少3个似乎没有标准解决方案的问题:

  • 有条件地跳过响应,例如,因为客户端不期望该请求将收到响应.如果我使用从消息到消息的典型流程,一旦请求到达其目标,我需要阻止它进一步传播回websocket.它可以使用特殊的过滤器(涉及一些痛苦)或使用各种其他方式(例如,使用akka流有条件地跳过流)来完成,但这增加了许多样板和复杂性.理想情况下,我希望能够插入跳过其他所有内容的"跳过"消息.

  • 将传入消息路由到适当的位置(例如,actor,mongo).再一次,我可以找到涉及大量样板的解决方案(例如,在不处理此类请求的分支处进行广播和过滤).理想情况下,我应该能够定义如下内容:如果消息是X,则将其发送到那里,如果消息是Y,则将其发送到那里等.

  • 将错误传播回客户端.非常类似于上面描述的路由问题.例如,如果JSON解析失败,我需要添加一个单独的路径(广播+合并),我发送错误消息,但如果在下一阶段发生错误,我甚至无法轻易地重用相同的路径,我想将该错误传播给用户.理想情况下,我应该有一个单独的错误处理路径,可以在流中的任意点使用,完全绕过流的其余部分并返回到客户端.

目前,我有这个非常复杂的图表,跨越15行,路径经过> 20个不同的阶段,我真的很担心要保持这个解决方案的复杂性.DSL在这个尺寸上几乎是不可读的.我当然可以更好地模块化,但这对于一些应该简单得多的事情来说就像是一个疯狂的麻烦.

我错过了什么吗?考虑到akka-streams这样的任务,我是疯了吗?任何可以让我控制所有复杂性的想法或代码示例?

提前致谢!

Ric*_*ich 2

这是一个非常广泛的问题,目前的形式可能无法回答。

Akka HTTP 在其 HTTP 处理层中解决了许多此类问题(例如空响应、路由、返回错误)。您能否利用在那里学到的一些经验教训并将其应用到您的系统中?或者,也许更好的是,您能否将系统从使用 websocket 通信转换为使用 HTTP 通信并直接使用该代码?