Scala,Play,Akka,Websocket:如何通过websocket传递actor消息

Cab*_*ero 7 scala websocket akka playframework playframework-2.0

我有一个使用应用程序启动的actor,在后台运行,观察某些更改以及是否有任何报告.目前它只是一个println控制台.我需要做的是每当有新消息时 - 使用Websocket将其发送到前端.

这是我的Play Global对象,其中启动了监视/侦听actor:

object Global extends GlobalSettings {

    override def onStart(app: Application) {

        class Listener extends Actor {
            //This needs to be changed to pass messages to Websocket, how?
            def receive = {
                case Create(path) => println("CREATE " + path)
                case Delete(path) => println("DELETE " + path)
                case Modify(path) => println("MODIFY " + path)
            }
        }

        val listener = Akka.system.actorOf(Props[Listener], "listener")
        val swatch = Akka.system.actorOf(Props[SwatchActor], "swatch")
        swatch ! Watch("/folder/path", Seq(Create, Modify, Delete), true, Some(listener))

    }

}
Run Code Online (Sandbox Code Playgroud)

这是我的Play控制器:

object Application extends Controller {

    def test = WebSocket.using[String] { request =>

        //This hopefully gets the listener actor reference?
        val listener = Akka.system.actorSelection("/user/listener")

        val (out, channel) = Concurrent.broadcast[String]
        val in = Iteratee.foreach[String] { msg =>
            //Actor messages must be pushed here, how?
            channel push("RESPONSE: " + msg)
        }

        (in, out)

    }   

}
Run Code Online (Sandbox Code Playgroud)

我知道为了建立websocket连接,必须有一个初始的"in".

所以我的问题是:

  1. 如何修改Listeneractor以将消息推送到Websocket?
  2. 在建立websocket连接后,我需要做些什么来准备演员推送消息?
  3. 如何将消息从侦听器actor推送到websocket?

Cab*_*ero 6

我找到了解决方案.

必须从单独文件导入的案例类:

case class Start(out: Concurrent.Channel[String])
Run Code Online (Sandbox Code Playgroud)

全局对象:

object Global extends GlobalSettings {

    override def onStart(app: Application) {

        class Listener extends Actor {
            var out = {
                val (enum, chan) = Concurrent.broadcast[String]
                chan
            }
            def receive = {
                //Websocket channel out is set here
                case Start(out) => this.out = out
                //Pushing messages to Websocket
                case Create(path) => this.out.push(path.toString)
                case Delete(path) => this.out.push(path.toString)
                case Modify(path) => this.out.push(path.toString)
            }
        }

        val listener = Akka.system.actorOf(Props[Listener], "listener")
        val swatch = Akka.system.actorOf(Props[SwatchActor], "swatch")
        swatch ! Watch("/folder/path", Seq(Create, Modify, Delete), true, Option(listener))

    }

}
Run Code Online (Sandbox Code Playgroud)

播放控制器:

object Application extends Controller {

    def test = WebSocket.using[String] { request =>

        val (out, channel) = Concurrent.broadcast[String]

        val listener = Akka.system.actorSelection("akka://application/user/listener")
        //This is where the websocket out channel is being passed to the listener actor
        listener ! Start(channel)

        val in = Iteratee.foreach[String] { msg =>
            channel push("RESPONSE: " + msg)
        }

        (in, out)

    }   

}
Run Code Online (Sandbox Code Playgroud)