我知道已经有人问过,但我似乎无法找到答案.这是我的代码:
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import spray.json.DefaultJsonProtocol
final case class Client(clientId:Int, clientName:String, platformIds:Int, host:String, password:String)
object ClientJson extends DefaultJsonProtocol with SprayJsonSupport {
implicit val clientFormat = jsonFormat5(Client)
}
class HTTPListenerActor extends Actor with ImplicitMaterializer with RoadMap {
implicit val conf = context.system.settings.config
implicit val system = context.system
implicit val ec = context.dispatcher
Await.result(Http().bindAndHandle(roads, "interface", 8080), Duration.Inf)
override def receive:Receive = Actor.emptyBehavior
}
trait RoadMap extends Directives {
val roads: Route = path("client"/IntNumber) { id =>
import ClientJson._
post {
entity(as[Client]) { c …Run Code Online (Sandbox Code Playgroud) 我正在尝试调用谷歌地理编码API并检索响应.
lazy val geoCodingConnectionFlow: Flow[HttpRequest, HttpResponse, Any] =
Http().outgoingConnectionHttps(config.getString("services.geoCodingApiHost"), config.getInt("services.geoCodingApiPort"))
def geoCodingRequest(request: HttpRequest): Future[HttpResponse] = Source.single(request).via(geoCodingConnectionFlow).runWith(Sink.head)
/**
* This call to google service is limited
* @see https://developers.google.com/maps/documentation/geocoding/#Limits
*/
def ?(l: GeoLocation)(implicit ec: ExecutionContext): Future[Either[String, List[Result]]] = {
val latlang = s"17.3644264,78.3896741"
import org.json4s.native.Serialization
import org.json4s.NoTypeHints
import akka.http.scaladsl.model._
import akka.http.scaladsl.unmarshalling._
implicit val materializer = ActorMaterializer()
implicit val executor = system.dispatcher
implicit val formats = Serialization.formats(NoTypeHints)
geoCodingRequest(RequestBuilding.Get(s"${config.getString("services.geoCodingApiUrlPart")}?latlng=$latlang&key=${config.getString("services.geoCodingApiKey")}")).flatMap { response =>
val nonBinaryType = ContentTypes.`application/json`
def responseEntity: HttpEntity = response.entity
response.status match …Run Code Online (Sandbox Code Playgroud) 我正在尝试建立一个简单的akka-http 2.4.2项目来对其进行测试,但是我没有这样做。
我的built.sbt:
import NativePackagerHelper._
lazy val akkaVersion = "2.4.2"
lazy val root = (project in file(".")).
settings(
name := "akkTest",
version := "0.1",
scalaVersion := "2.11.7")
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-http-spray-json-experimental" % akkaVersion
)
enablePlugins(JavaServerAppPackaging)
Run Code Online (Sandbox Code Playgroud)
我在Main.scala中的代码段
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.actor.ActorSystem
object Main extends App {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
val serverSource =
Http().bind(interface = "localhost", port = 8080) …Run Code Online (Sandbox Code Playgroud) 给定Scala中的以下单例对象:
package demo
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import scala.concurrent.Future
import scala.io.StdIn
object WebServer extends App {
implicit val system = ActorSystem("myActorSystem")
implicit val executionContext = system.dispatcher
implicit val materializer = ActorMaterializer()
val route = {
path("api" / "done-as-promised") {
get {
complete {
Future.successful("done")
}
}
}
}
val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)
}
Run Code Online (Sandbox Code Playgroud)
并进行以下单元测试
package demo
import akka.http.scaladsl.testkit.ScalatestRouteTest
import org.scalactic.TypeCheckedTripleEquals
import org.scalatest.{Inspectors, Matchers, WordSpec}
class WebServerSpec extends WordSpec with Matchers with TypeCheckedTripleEquals with Inspectors …Run Code Online (Sandbox Code Playgroud) 我有这条路线:
path("rus") {
complete("??????!")
}
Run Code Online (Sandbox Code Playgroud)
当我使用浏览器(chrome)进入/ rus时,得到以下输出:
“ !!”!
为什么?响应头是:
HTTP/1.1 200 OK
Server: akka-http/2.4.10
Date: Mon, 10 Oct 2016 22:31:53 GMT
Content-Type: application/json
Content-Length: 15
Run Code Online (Sandbox Code Playgroud)
我曾经使用过喷雾,但现在我想使用akka http,我没有遇到过这样的问题。
当我卷曲此路径时,我得到正常输出
$ curl http://localhost:9010/rus
"??????!"
Run Code Online (Sandbox Code Playgroud)
我看到响应标头'Content-Type'应该是'application / json; charset = utf-8'但缺少charset ...
我正在通过webSocketClientFlow上的 doc来尝试客户端websocket .
示例代码是:
import akka.actor.ActorSystem
import akka.Done
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ws._
import scala.concurrent.Future
object WebSocketClientFlow {
def main(args: Array[String]) = {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
import system.dispatcher
// Future[Done] is the materialized value of Sink.foreach,
// emitted when the stream completes
val incoming: Sink[Message, Future[Done]] =
Sink.foreach[Message] {
case message: TextMessage.Strict =>
println(message.text)
}
// send this as a message over the WebSocket
val outgoing = …Run Code Online (Sandbox Code Playgroud) 所以这是我的websocket服务器实现.
val route = get {
pathEndOrSingleSlash {
handleWebSocketMessages(websocketFlow)
}
}
def websocketFlow: Flow[Message, Message, Any] =
Flow[Message]
.collect { case TextMessage.Strict(textMessage) => protocol.hydrate(textMessage) }
.via(chatActorFlow(UUID.randomUUID()))
.map(event => TextMessage.Strict(protocol.serialize(event)))
def chatActorFlow(connectionId: UUID) : Flow[Protocol.Message, Protocol.Event, Any] = {
val sink = Flow[Protocol.Message]
.map(msg => Protocol.SignedMessage(connectionId, msg))
.to(Sink.actorRef(chatRef, Protocol.CloseConnection(connectionId)))
val source = Source
.mapMaterializedValue {
actor : ActorRef => {
chatRef ! Protocol.OpenConnection(actor, connectionId)
}
}
Flow.fromSinkAndSource(sink, source)
}
Run Code Online (Sandbox Code Playgroud)
我不知道是否有什么办法一旦类型的消息,关闭连接ConnectionClosed是通过发送chatRef?
我有以下代码:
val route:Route={
path("hello"){
get{
complete{
"done"
}
}
}
}
Http().bindAndHandle(route, "localhost", 8187)
Run Code Online (Sandbox Code Playgroud)
这里完整的回复字符串"完成".但是,我希望它返回200的状态代码.我该怎么做?
我刚刚开始使用Akka Http(和Scala),并且想知道是否有任何明确定义的模式来构造Akka代码.特别是,我正在寻找动态组合/聚合路由的结构模式.特别是,我正在寻找类似于下面的伪代码的解决方案:
trait MyActor extends Actor {
val myRouter: ActorRef = context.actorOf(FromConfig.props(Props[MyWorker]), "Worker")
val myRoute = .... //route definition for this trait
}
trait MySecondActor extends Actor {
val mySecondRouter: ActorRef = context.actorOf(FromConfig.props(Props[MySecondWorker]), "SecondWorker")
val myRoute = .... //route definition for this trait
}
Run Code Online (Sandbox Code Playgroud)
然后,在我的主服务器中混合使用特征来自动获取actor和路由:
class HttpServer extends SomeTrait with MyActor with MySecondActor {
.....
.....
}
Run Code Online (Sandbox Code Playgroud)
上述模式存在一些明显的问题,包括:
new HttpServer()我正在寻找的是一种模式:
我在StackOverflow上遇到了以下两个,但是想知道是否有更好的方法和明确定义的模式:
具有多个路由配置的akka-http(这不是真正的动态)
如何使用特征聚合akka-http路由?(使用反射的旧问题)
谢谢!
我过去曾成功地使用过 Akka Streams,但是,我目前很难理解为什么 Akka-HTTP 中的客户端 Websocket Streams 是按照文档中所示的方式定义和工作的。
由于 WebSocket 连接允许全双工通信,因此我希望这种连接由 Akka HTTP 中的两个独立流表示,一个用于传入流量,另一个用于传出流量。事实上,文档说明如下:
WebSocket 由两个消息流组成 [...]
它还进一步指出,传入消息由 a 表示Sink,传出消息由 a 表示Source。这是我的第一个困惑点 - 当使用两个单独的流时,您可能希望总共处理两个源和两个接收器,而不是每种类型中的一个。目前,我的猜测是传入流的源以及传出流的接收器对开发人员来说并没有多大用处,因此只是“隐藏”。
但是,将所有内容连接在一起时确实会令人困惑(请参阅上面链接的文档)。
使用时有问题的部分singleWebSocketRequest:
val flow: Flow[Message, Message, Future[Done]] =
Flow.fromSinkAndSourceMat(printSink, helloSource)(Keep.left)
Run Code Online (Sandbox Code Playgroud)
或使用时相同的部分webSocketClientFlow:
val (upgradeResponse, closed) =
outgoing
.viaMat(webSocketFlow)(Keep.right)
.toMat(incoming)(Keep.both)
.run()
Run Code Online (Sandbox Code Playgroud)
这与我目前对流工作流程的理解相矛盾。
Source用于传出消息和Sink用于传入消息?上面的代码看起来像我向自己而不是服务器发送消息。Flow[Message, Message, ...]?将传出消息转换为传入消息似乎没有意义。感谢任何有助于提高我理解的帮助,谢谢。
编辑:
我在使用Source和Sink通过 WebSocket 发送数据时没有问题,我只是想了解为什么阶段的接线是这样完成的。
akka-http ×10
scala ×9
akka ×7
akka-stream ×2
akka-cluster ×1
json ×1
json4s ×1
sbt ×1
sbt-assembly ×1
spray ×1
spray-json ×1