coo*_*les 1 scala akka akka-stream akka-http
我刚开始使用Akka和Scala,并尝试使用Akka Streams连接到WebSocket。我创建了SocketActor下面的示例,并尝试从main方法实例化。
这是我的SocketActor:
package com.lightbend.akka.sample
import akka.actor.{Actor, Props}
import akka.Done
import akka.http.scaladsl.Http
import akka.stream.scaladsl._
import akka.http.scaladsl.model.ws._
import scala.concurrent.Future
object SocketActor {
def props(coinApiIdentifier: String): Props = Props(new SocketActor(coinApiIdentifier))
case object Start
case object Stop
}
class SocketActor(val ticker: String) extends Actor {
import SocketActor._
// Future[Done] is the materialized value of Sink.foreach,
// emitted when the stream completes
private val incoming: Sink[Message, Future[Done]] =
Sink.foreach[Message] {
case message: TextMessage.Strict =>
println(message.text)
}
// send this as a message over the WebSocket
private val outgoing = Source.single(TextMessage("hello world!"))
// flow to use (note: not re-usable!)
private val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("wss://api.com/v1/"))
// the materialized value is a tuple with
// upgradeResponse is a Future[WebSocketUpgradeResponse] that
// completes or fails when the connection succeeds or fails
// and closed is a Future[Done] with the stream completion from the incoming sink
private val graph =
outgoing
.viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse]
.toMat(incoming)(Keep.both) // also keep the Future[Done]
override def receive: PartialFunction[Any, Unit] = {
case Start =>
println("Start message received.")
graph.run()
}
}
Run Code Online (Sandbox Code Playgroud)
而我的主要方法是:
object AkkaQuickstart extends App {
// Create the 'helloAkka' actor system
val system: ActorSystem = ActorSystem("test")
val materializer: ActorMaterializer = ActorMaterializer()
val socketActor: ActorRef =
system.actorOf(SocketActor.props("hello"), "socket-actor")
socketActor ! Start
}
Run Code Online (Sandbox Code Playgroud)
不幸的是,我得到了错误:
错误:(38,35)找不到参数系统的隐式值:akka.actor.ActorSystem私有val webSocketFlow = Http()。webSocketClientFlow(WebSocketRequest(“ wss://api.com/v1/”))
我尝试过将一些implicit参数传递给的构造函数,SocketActor但效果不太好。似乎ActorSystem出于某种原因不在范围内。如何获得system有关该Http()功能的范围SocketActor?
定义一个隐式val:
class SocketActor(val ticker: String) extends Actor {
implicit val sys = context.system
// ...
}
Run Code Online (Sandbox Code Playgroud)
这将提供隐含ActorSystem的Http对象期望。
您的代码还有另一个问题:由于范围中没有实现器,因此actor中的流将不会运行。解决此问题的一种方法是在actor中创建实现器:
class SocketActor(val ticker: String) extends Actor {
implicit val sys = context.system
implicit val mat = ActorMaterializer()(context)
// ...
}
Run Code Online (Sandbox Code Playgroud)
请注意,如果将实现器定义为implicit val mat = ActorMaterializer(),则context.system由于会隐式使用implicit val sys = context.system。取而代之的是,物化器是使用参与者的context显式创建的。这样做是由于文档中的警告:
请勿通过将角色传递
context.system给角色在角色内部创建新的角色实现器。这将导致ActorMaterializer为每个这样的角色创建一个新的并且有可能泄漏(除非您明确将其关闭)。相反,建议您传入Materializer或使用actor的创建context。
推荐的方法允许actor的创建者重用物化器,该方法是将物化器作为隐式参数传递给actor:
class SocketActor(val ticker: String)(implicit val mat: ActorMaterializer) extends Actor {
implicit val sys = context.system
// ...
}
Run Code Online (Sandbox Code Playgroud)
然后,您可以将主程序中的实现器传递给该参与者:
object AkkaQuickstart extends App {
implicit val system: ActorSystem = ActorSystem("test")
implicit val materializer: ActorMaterializer = ActorMaterializer()
val socketActor: ActorRef =
system.actorOf(Props(classOf[SocketActor], "hello", materializer), "socket-actor")
socketActor ! Start
}
Run Code Online (Sandbox Code Playgroud)