标签: akka-http

使用Akka-Http流式传输视频或(未知长度的流)

我正在研究一个实验项目的akka​​-http(akka-http-experimental_2.11 - 0.4).我之前没有参与喷涂工作.

我想流式传输mp4视频(大小可能会有所不同)到浏览器.但我不知道如何为HttpResponse创建HttpEntity(HttpEntity.Chunked?).我尝试过像这样的脏东西,这不是一种正确的方法,但这只适用于Firefox,只能提供单一请求.

def output =  Source.fromFile("C:\\Users\\karthik\\Downloads\\big_buck_bunny.mp4")(scala.io.Codec.ISO8859)

lazy val video = HttpResponse(entity = HttpEntity.Chunked(MediaTypes.`video/mp4`, Flow(output.map(_.toByte).map(a => ChunkStreamPart(ByteString(a)))).toProducer(materializer)))
Run Code Online (Sandbox Code Playgroud)

当我在另一个选项卡或浏览器中打开相同的URL时,服务器无法处理该请求.由于这是一个实验性项目,因此没有足够的文档来处理大型文件流.

我得到了示例源代码表https://github.com/akka/akka/blob/release-2.3-dev/akka-http-core/src/test/scala/akka/http/TestServer.scala

我需要知道如何为HttpEntity.Chunked创建Producer.如果任何人可以用简单的术语解释,这将有助于理解API.

谢谢.

(PS:有人请在Stack Overflow中创建Akka-Http标签)

akka spray scala-2.11 akka-stream akka-http

6
推荐指数
1
解决办法
1555
查看次数

如何处理响应超时?

在akka-http路由中,我可以Future作为隐式转换为的响应返回ToResponseMarshaller.

有没有办法处理这个未来的超时?或路由级别的连接超时?或者一种方法是使用Await()功能?

现在客户端可以永远等待响应.

complete {
   val future = for {
     response <- someIOFunc()
     entity <- someOtherFunc()
   } yield entity
   future.onComplete({
     case Success(result) =>
       HttpResponse(entity = HttpEntity(MediaTypes.`text/xml`, result))
     case Failure(result) =>
       HttpResponse(entity = utils.getFault("fault"))
   })
   future
 }
Run Code Online (Sandbox Code Playgroud)

scala akka-http

6
推荐指数
1
解决办法
2461
查看次数

在定义路由指令时是否可以使用隐式参数?

我有一个指令,定义为

def allowedRoles(roles: UserRole*)(implicit login: Login): Directive0 = ???
Run Code Online (Sandbox Code Playgroud)

但我似乎无法在不必明确传递login参数的情况下使用它

def myRoutes(implicit req: HttpRequest, login: Login) = {
  path("example" / "path") {
    get {
      allowedRoles(Administrator) { // ? fails 
        handleGet
      }
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

如果我尝试编译它失败,类型不匹配:

[error]  found   : akka.http.scaladsl.server.Route
[error]     (which expands to)  akka.http.scaladsl.server.RequestContext => scala.concurrent.Future[akka.http.scaladsl.server.RouteResult]
[error]  required: com.example.Login
[error]         allowedRoles(Administrator) { handleGet } }
Run Code Online (Sandbox Code Playgroud)

如果我将标记线更改为allowedRoles(Administrator)(login)然后它可以工作,但似乎我不应该这样做,我错过了什么?

scala akka-http

6
推荐指数
1
解决办法
644
查看次数

Kafka给websocket留言

我正在尝试使用reactive-kafka,akka-http和akka-stream将一个Kafka使用者写入websocket流.

  val publisherActor = actorSystem.actorOf(CommandPublisher.props)
  val publisher = ActorPublisher[String](publisherActor)
  val commandSource = Source.fromPublisher(publisher) map toMessage
  def toMessage(c: String): Message = TextMessage.Strict(c)

  class CommandPublisher extends ActorPublisher[String] {
    override def receive = {
      case cmd: String =>
        if (isActive && totalDemand > 0)
          onNext(cmd)
    }
  }

  object CommandPublisher {
    def props: Props = Props(new CommandPublisher())
  }

  // This is the route 
  def mainFlow(): Route = {
    path("ws" / "commands" ) {
       handleWebSocketMessages(Flow.fromSinkAndSource(Sink.ignore, commandSource))
    } 
  }
Run Code Online (Sandbox Code Playgroud)

从kafka使用者(这里省略),我做了一个publisherActor ! commandString动态添加内容到websocket.

但是,当我启动多个客户端到websocket时,我在后端遇到此异常:

[ERROR] …
Run Code Online (Sandbox Code Playgroud)

websocket akka-stream akka-http

6
推荐指数
1
解决办法
1023
查看次数

我如何计算akka当前的开放连接数

我正在研究akka http

  "com.typesafe.akka" %% "akka-actor" % "2.4.6",
  "com.typesafe.akka" % "akka-http-experimental_2.11" % "2.4.6"
Run Code Online (Sandbox Code Playgroud)

我正在测试简单的代码,如下所示.我的主要问题是如何增强它以获得关闭连接的通知,那么我可以打印当前打开的连接数?

object StatsRepo{
 val totConn = new AtomicInteger(0)
 val currOpenConn = new AtomicInteger(0) // how to count this?
}

object Boot2 extends App{
   implicit val system = ActorSystem("akka-http")
   implicit val materializer = ActorMaterializer()
   implicit val executionContext = system.dispatcher

   val requestHandler: HttpRequest => Future[HttpResponse] = {
       // do some work here...
   }
   val serverSource = Http().bind("0.0.0.0", 8080)


  val bindingFuture: Future[Http.ServerBinding] =
    serverSource.to(Sink.foreach { connection =>
      StatsRepo.totConn.incrementAndGet()
     connection handleWithAsyncHandler requestHandler …
Run Code Online (Sandbox Code Playgroud)

scala akka akka-http

6
推荐指数
1
解决办法
366
查看次数

Akka Http客户端:自定义标头

我正在尝试使用Akka-Http来调用REST URL.我是从akka文档中关注这个例子的.使用此我可以进行其余的通话.但我无法找到如何添加自定义请求标头.我尝试使用ModeledCustomHeader,但仍然请求没有标题.这是我的例子.

final class ApiTokenHeader(token: String) extends ModeledCustomHeader[ApiTokenHeader] {
  override def renderInRequests = false
  override def renderInResponses = false
  override val companion = ApiTokenHeader
  override def value: String = token
}
object ApiTokenHeader extends ModeledCustomHeaderCompanion[ApiTokenHeader] {
  override val name = "apiKey"
  override def parse(value: String) = Try(new ApiTokenHeader(value))
}
Run Code Online (Sandbox Code Playgroud)

这就是我调用的方式,

def invokeHttpRequest(cmd: WSRequestCommand) = {
    val s: HttpRequest = HttpRequest(uri = cmd.url).addHeader(ApiTokenHeader(cmd.apiKey))

    sender ! http.singleRequest(s)
  }
Run Code Online (Sandbox Code Playgroud)

而不是addHeader,我尝试使用addHeaders(),但Seq(ApiTokenHeader)不起作用,因为它给出了编译错误.

val s: HttpRequest = HttpRequest(uri = cmd.url, headers = Seq(ApiTokenHeader(cmd.apiKey)))
Run Code Online (Sandbox Code Playgroud)

错误:(55,66)类型不匹配; …

rest scala akka-http

6
推荐指数
1
解决办法
8890
查看次数

Akka HTTP Websocket,如何识别actor内部的连接

我正在研究scala中的简单多人游戏,我想通过websockets为JS客户端公开.

这是我的WebsocketServer类

class WebsocketServer(actorRef: ActorRef, protocol: Protocol, system: ActorSystem, materializer: ActorMaterializer) extends Directives {

    val route = get {
      pathEndOrSingleSlash {
        handleWebSocketMessages(websocketFlow)
      }
    }

    def websocketFlow: Flow[Message, Message, Any] =
      Flow[Message]
        .map {
          case TextMessage.Strict(textMessage) => protocol.hydrate(textMessage)
        }
        .via(actorFlow)
        .map(event => TextMessage.Strict(protocol.serialize(event)))


    def actorFlow : Flow[Protocol.Message, Protocol.Event, Any] = {
      val sink =
        Flow[Protocol.Message]
          .to(Sink.actorRef[Protocol.Message](actorRef, Protocol.CloseConnection()))

      val source =
        Source.actorRef[Protocol.Event](1, OverflowStrategy.fail)
          .mapMaterializedValue(actor => actorRef ! Protocol.OpenConnection(actor))

      Flow.fromSinkAndSource(sink, source)
    }
}
Run Code Online (Sandbox Code Playgroud)

这是我的actor的简化代码,它应该从websocket服务器接收消息.

class GameActor() extends Actor {

  private var connections: List[ActorRef] …
Run Code Online (Sandbox Code Playgroud)

scala akka-stream akka-http

6
推荐指数
1
解决办法
1076
查看次数

使用https时,akka http客户端system.shutdown()会产生“传出请求流错误(akka.stream.AbruptTerminationException)”

您好以下代码按预期工作。

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()

import system.dispatcher

val request = HttpRequest(uri = "http://www.google.com")

Http.get(system).singleRequest(request).map(_.entity.dataBytes.runWith(Sink.ignore))
  .onComplete { _ =>
    println("shutting down actor system...")
    system.terminate()
  }
Run Code Online (Sandbox Code Playgroud)

但是,如果我将http://www.google.com更改为https://www.google.com,如下所示:

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()

import system.dispatcher

val request = HttpRequest(uri = "https://www.google.com")

Http.get(system).singleRequest(request).map(_.entity.dataBytes.runWith(Sink.ignore))
  .onComplete { _ =>
    println("shutting down actor system...")
    system.terminate()
  }
Run Code Online (Sandbox Code Playgroud)

我收到以下错误消息:

shutting down actor system...
[ERROR] [02/11/2017 13:13:08.929] [default-akka.actor.default-dispatcher-4] [akka.actor.ActorSystemImpl(default)] Outgoing request stream error (akka.stream.AbruptTerminationException)
Run Code Online (Sandbox Code Playgroud)

谁知道为什么https会产生上述错误,我该如何解决?

akka-stream akka-http

6
推荐指数
1
解决办法
1404
查看次数

akka-http没有堆栈跟踪或错误的详细信息

我得到的结构基本上可以概括为:

  • 外部用户向akka-http服务器发出休息请求
  • akka-http使用asynchttpclient向(某些)数据源发出请求(查询?)
  • akka-http转换asynchttpclient的结果并将其提供给用户

在某些时候,我从akka收到一个错误,它几乎没有告诉我.在asynchttpclient返回一些结果后立即发生此错误.(我可以在这一点上打印日志上的结果,它们是从json等解析的..但akka已经出错了)

即使在调试日志记录级别,我也没有从akka或stacktrace获得可解密的错误消息.

我得到的唯一信息是:

2017-03-24 17:22:55 INFO  CompanyRepository:111 - search company with name:"somecompanyname"
2017-03-24 17:22:55 INFO  CompanyRepository:73 - [QUERY TIME]: 527ms
[ERROR] [03/24/2017 17:22:55.951] [company-api-system-akka.actor.default-dispatcher-3] [akka.actor.ActorSystemImpl(company-api-system)] Error during processing of request: 'requirement failed'. Completing with 500 Internal Server Error response.
Run Code Online (Sandbox Code Playgroud)

这个错误信息是我唯一得到的.配置的相关部分:

akka {
  loglevel = "DEBUG"
  # edit --  tested with sl4jlogger with no change
  #loggers = ["akka.event.slf4j.Slf4jLogger"]
  #logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"

  parsing {
    max-content-length = 800m
    max-chunk-size             = 100m
  }
  server {
    server-header = akka-http/${akka.http.version}
    idle-timeout = …
Run Code Online (Sandbox Code Playgroud)

scala akka asynchttpclient akka-http

6
推荐指数
1
解决办法
2478
查看次数

是否使用源队列实现线程安全的akka​​-http中的连接池?

参考以下提到的实现:

http://doc.akka.io/docs/akka-http/10.0.5/scala/http/client-side/host-level.html

val poolClientFlow = Http().cachedHostConnectionPool[Promise[HttpResponse]]("akka.io")
val queue =
  Source.queue[(HttpRequest, Promise[HttpResponse])](QueueSize, OverflowStrategy.dropNew)
    .via(poolClientFlow)
    .toMat(Sink.foreach({
      case ((Success(resp), p)) => p.success(resp)
      case ((Failure(e), p))    => p.failure(e)
    }))(Keep.left)
    .run()
Run Code Online (Sandbox Code Playgroud)

从多个线程提供队列http请求是否可以安全线程?如果不是,那么实施此类要求的最佳方式是什么?也许使用一个专门的演员?

scala connection-pooling akka-stream akka-http

6
推荐指数
1
解决办法
648
查看次数