mor*_*man 5 scala akka playframework akka-stream akka-http
我正在尝试使用Play和akka流创建一个简单的Websocket连接代理.交通流量是这样的:
(Client) request -> -> request (Server)
Proxy
(Client) response <- <- response (Server)
Run Code Online (Sandbox Code Playgroud)
我按照一些例子后提出了以下代码:
def socket = WebSocket.accept[String, String] { request =>
val uuid = UUID.randomUUID().toString
// wsOut - actor that deals with incoming websocket frame from the Client
// wsIn - publisher of the frame for the Server
val (wsOut: ActorRef, wsIn: Publisher[String]) = {
val source: Source[String, ActorRef] = Source.actorRef[String](10, OverflowStrategy.dropTail)
val sink: Sink[String, Publisher[String]] = Sink.asPublisher(fanout = false)
source.toMat(sink)(Keep.both).run()
}
// sink that deals with the incoming messages from the Server
val serverIncoming: Sink[Message, Future[Done]] =
Sink.foreach[Message] {
case message: TextMessage.Strict =>
println("The server has sent: " + message.text)
}
// source for sending a message over the WebSocket
val serverOutgoing = Source.fromPublisher(wsIn).map(TextMessage(_))
// flow to use (note: not re-usable!)
val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://0.0.0.0:6000"))
// the materialized value is a tuple with
// upgradeResponse is a Future[WebSocketUpgradeResponse] that
// completes or fails when the connection succeeds or fails
// and closed is a Future[Done] with the stream completion from the incoming sink
val (upgradeResponse, closed) =
serverOutgoing
.viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse]
.toMat(serverIncoming)(Keep.both) // also keep the Future[Done]
.run()
// just like a regular http request we can access response status which is available via upgrade.response.status
// status code 101 (Switching Protocols) indicates that server support WebSockets
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
}
}
// in a real application you would not side effect here
connected.onComplete(println)
closed.foreach(_ => println("closed"))
val actor = system.actorOf(WebSocketProxyActor.props(wsOut, uuid))
val finalFlow = {
val sink = Sink.actorRef(actor, akka.actor.Status.Success(()))
val source = Source.maybe[String] // what the client receives. How to connect with the serverIncoming sink ???
Flow.fromSinkAndSource(sink, source)
}
finalFlow
Run Code Online (Sandbox Code Playgroud)
使用此代码,流量从客户端到代理服务器再到服务器,返回到代理服务器就是这样.它没有进一步向客户提供.我怎样才能解决这个问题 ?我想我需要以某种方式将接收serverIncoming器连接到接收器source中finalFlow,但我无法弄清楚如何做到这一点......
或者我对这种方法完全错了?使用a Bidiflow或a 更好Graph吗?我是akka流的新手,仍在努力解决问题.
以下似乎有效。注意:我已经在同一个控制器中实现了服务器套接字和代理套接字,但是您可以将它们分开或在不同的实例上部署相同的控制器。在这两种情况下,“upper”服务的 ws url 都需要更新。
package controllers
import javax.inject._
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws.{Message, TextMessage, WebSocketRequest, WebSocketUpgradeResponse}
import akka.stream.Materializer
import akka.stream.scaladsl.Flow
import play.api.libs.streams.ActorFlow
import play.api.mvc._
import scala.concurrent.{ExecutionContext, Future}
import scala.language.postfixOps
@Singleton
class SomeController @Inject()(implicit exec: ExecutionContext,
actorSystem: ActorSystem,
materializer: Materializer) extends Controller {
/*--- proxy ---*/
def websocketFlow: Flow[Message, Message, Future[WebSocketUpgradeResponse]] =
Http().webSocketClientFlow(WebSocketRequest("ws://localhost:9000/upper-socket"))
def proxySocket: WebSocket = WebSocket.accept[String, String] { _ =>
Flow[String].map(s => TextMessage(s))
.via(websocketFlow)
.map(_.asTextMessage.getStrictText)
}
/*--- server ---*/
class UpperService(socket: ActorRef) extends Actor {
override def receive: Receive = {
case s: String => socket ! s.toUpperCase()
case _ =>
}
}
object UpperService {
def props(socket: ActorRef): Props = Props(new UpperService(socket))
}
def upperSocket: WebSocket = WebSocket.accept[String, String] { _ =>
ActorFlow.actorRef(out => UpperService.props(out))
}
}
Run Code Online (Sandbox Code Playgroud)
您需要像这样设置路由:
GET /upper-socket controllers.SomeController.upperSocket
GET /proxy-socket controllers.SomeController.proxySocket
Run Code Online (Sandbox Code Playgroud)
您可以通过向 ws://localhost:9000/proxy-socket 发送字符串来进行测试。答案将是大写字符串。
不过,1 分钟不活动后将会超时:
akka.stream.scaladsl.TcpIdleTimeoutException: TCP idle-timeout encountered on connection to [localhost:9000], no bytes passed in the last 1 minute
Run Code Online (Sandbox Code Playgroud)
但请参阅: http: //doc.akka.io/docs/akka-http/current/scala/http/common/timeouts.html了解如何配置它。