我正在研究一个实验项目的akka-http(akka-http-experimental_2.11 - 0.4).我之前没有参与喷涂工作.
我想流式传输mp4视频(大小可能会有所不同)到浏览器.但我不知道如何为HttpResponse创建HttpEntity(HttpEntity.Chunked?).我尝试过像这样的脏东西,这不是一种正确的方法,但这只适用于Firefox,只能提供单一请求.
def output = Source.fromFile("C:\\Users\\karthik\\Downloads\\big_buck_bunny.mp4")(scala.io.Codec.ISO8859)
lazy val video = HttpResponse(entity = HttpEntity.Chunked(MediaTypes.`video/mp4`, Flow(output.map(_.toByte).map(a => ChunkStreamPart(ByteString(a)))).toProducer(materializer)))
Run Code Online (Sandbox Code Playgroud)
当我在另一个选项卡或浏览器中打开相同的URL时,服务器无法处理该请求.由于这是一个实验性项目,因此没有足够的文档来处理大型文件流.
我需要知道如何为HttpEntity.Chunked创建Producer.如果任何人可以用简单的术语解释,这将有助于理解API.
谢谢.
(PS:有人请在Stack Overflow中创建Akka-Http标签)
在akka-http路由中,我可以Future作为隐式转换为的响应返回ToResponseMarshaller.
有没有办法处理这个未来的超时?或路由级别的连接超时?或者一种方法是使用Await()功能?
现在客户端可以永远等待响应.
complete {
val future = for {
response <- someIOFunc()
entity <- someOtherFunc()
} yield entity
future.onComplete({
case Success(result) =>
HttpResponse(entity = HttpEntity(MediaTypes.`text/xml`, result))
case Failure(result) =>
HttpResponse(entity = utils.getFault("fault"))
})
future
}
Run Code Online (Sandbox Code Playgroud) 我有一个指令,定义为
def allowedRoles(roles: UserRole*)(implicit login: Login): Directive0 = ???
Run Code Online (Sandbox Code Playgroud)
但我似乎无法在不必明确传递login参数的情况下使用它
def myRoutes(implicit req: HttpRequest, login: Login) = {
path("example" / "path") {
get {
allowedRoles(Administrator) { // ? fails
handleGet
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
如果我尝试编译它失败,类型不匹配:
[error] found : akka.http.scaladsl.server.Route
[error] (which expands to) akka.http.scaladsl.server.RequestContext => scala.concurrent.Future[akka.http.scaladsl.server.RouteResult]
[error] required: com.example.Login
[error] allowedRoles(Administrator) { handleGet } }
Run Code Online (Sandbox Code Playgroud)
如果我将标记线更改为allowedRoles(Administrator)(login)然后它可以工作,但似乎我不应该这样做,我错过了什么?
我正在尝试使用reactive-kafka,akka-http和akka-stream将一个Kafka使用者写入websocket流.
val publisherActor = actorSystem.actorOf(CommandPublisher.props)
val publisher = ActorPublisher[String](publisherActor)
val commandSource = Source.fromPublisher(publisher) map toMessage
def toMessage(c: String): Message = TextMessage.Strict(c)
class CommandPublisher extends ActorPublisher[String] {
override def receive = {
case cmd: String =>
if (isActive && totalDemand > 0)
onNext(cmd)
}
}
object CommandPublisher {
def props: Props = Props(new CommandPublisher())
}
// This is the route
def mainFlow(): Route = {
path("ws" / "commands" ) {
handleWebSocketMessages(Flow.fromSinkAndSource(Sink.ignore, commandSource))
}
}
Run Code Online (Sandbox Code Playgroud)
从kafka使用者(这里省略),我做了一个publisherActor ! commandString动态添加内容到websocket.
但是,当我启动多个客户端到websocket时,我在后端遇到此异常:
[ERROR] …Run Code Online (Sandbox Code Playgroud) 我正在研究akka http
"com.typesafe.akka" %% "akka-actor" % "2.4.6",
"com.typesafe.akka" % "akka-http-experimental_2.11" % "2.4.6"
Run Code Online (Sandbox Code Playgroud)
我正在测试简单的代码,如下所示.我的主要问题是如何增强它以获得关闭连接的通知,那么我可以打印当前打开的连接数?
object StatsRepo{
val totConn = new AtomicInteger(0)
val currOpenConn = new AtomicInteger(0) // how to count this?
}
object Boot2 extends App{
implicit val system = ActorSystem("akka-http")
implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher
val requestHandler: HttpRequest => Future[HttpResponse] = {
// do some work here...
}
val serverSource = Http().bind("0.0.0.0", 8080)
val bindingFuture: Future[Http.ServerBinding] =
serverSource.to(Sink.foreach { connection =>
StatsRepo.totConn.incrementAndGet()
connection handleWithAsyncHandler requestHandler …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用Akka-Http来调用REST URL.我是从akka文档中关注这个例子的.使用此我可以进行其余的通话.但我无法找到如何添加自定义请求标头.我尝试使用ModeledCustomHeader,但仍然请求没有标题.这是我的例子.
final class ApiTokenHeader(token: String) extends ModeledCustomHeader[ApiTokenHeader] {
override def renderInRequests = false
override def renderInResponses = false
override val companion = ApiTokenHeader
override def value: String = token
}
object ApiTokenHeader extends ModeledCustomHeaderCompanion[ApiTokenHeader] {
override val name = "apiKey"
override def parse(value: String) = Try(new ApiTokenHeader(value))
}
Run Code Online (Sandbox Code Playgroud)
这就是我调用的方式,
def invokeHttpRequest(cmd: WSRequestCommand) = {
val s: HttpRequest = HttpRequest(uri = cmd.url).addHeader(ApiTokenHeader(cmd.apiKey))
sender ! http.singleRequest(s)
}
Run Code Online (Sandbox Code Playgroud)
而不是addHeader,我尝试使用addHeaders(),但Seq(ApiTokenHeader)不起作用,因为它给出了编译错误.
val s: HttpRequest = HttpRequest(uri = cmd.url, headers = Seq(ApiTokenHeader(cmd.apiKey)))
Run Code Online (Sandbox Code Playgroud)
错误:(55,66)类型不匹配; …
我正在研究scala中的简单多人游戏,我想通过websockets为JS客户端公开.
这是我的WebsocketServer类
class WebsocketServer(actorRef: ActorRef, protocol: Protocol, system: ActorSystem, materializer: ActorMaterializer) extends Directives {
val route = get {
pathEndOrSingleSlash {
handleWebSocketMessages(websocketFlow)
}
}
def websocketFlow: Flow[Message, Message, Any] =
Flow[Message]
.map {
case TextMessage.Strict(textMessage) => protocol.hydrate(textMessage)
}
.via(actorFlow)
.map(event => TextMessage.Strict(protocol.serialize(event)))
def actorFlow : Flow[Protocol.Message, Protocol.Event, Any] = {
val sink =
Flow[Protocol.Message]
.to(Sink.actorRef[Protocol.Message](actorRef, Protocol.CloseConnection()))
val source =
Source.actorRef[Protocol.Event](1, OverflowStrategy.fail)
.mapMaterializedValue(actor => actorRef ! Protocol.OpenConnection(actor))
Flow.fromSinkAndSource(sink, source)
}
}
Run Code Online (Sandbox Code Playgroud)
这是我的actor的简化代码,它应该从websocket服务器接收消息.
class GameActor() extends Actor {
private var connections: List[ActorRef] …Run Code Online (Sandbox Code Playgroud) 您好以下代码按预期工作。
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
import system.dispatcher
val request = HttpRequest(uri = "http://www.google.com")
Http.get(system).singleRequest(request).map(_.entity.dataBytes.runWith(Sink.ignore))
.onComplete { _ =>
println("shutting down actor system...")
system.terminate()
}
Run Code Online (Sandbox Code Playgroud)
但是,如果我将http://www.google.com更改为https://www.google.com,如下所示:
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
import system.dispatcher
val request = HttpRequest(uri = "https://www.google.com")
Http.get(system).singleRequest(request).map(_.entity.dataBytes.runWith(Sink.ignore))
.onComplete { _ =>
println("shutting down actor system...")
system.terminate()
}
Run Code Online (Sandbox Code Playgroud)
我收到以下错误消息:
shutting down actor system...
[ERROR] [02/11/2017 13:13:08.929] [default-akka.actor.default-dispatcher-4] [akka.actor.ActorSystemImpl(default)] Outgoing request stream error (akka.stream.AbruptTerminationException)
Run Code Online (Sandbox Code Playgroud)
谁知道为什么https会产生上述错误,我该如何解决?
我得到的结构基本上可以概括为:
在某些时候,我从akka收到一个错误,它几乎没有告诉我.在asynchttpclient返回一些结果后立即发生此错误.(我可以在这一点上打印日志上的结果,它们是从json等解析的..但akka已经出错了)
即使在调试日志记录级别,我也没有从akka或stacktrace获得可解密的错误消息.
我得到的唯一信息是:
2017-03-24 17:22:55 INFO CompanyRepository:111 - search company with name:"somecompanyname"
2017-03-24 17:22:55 INFO CompanyRepository:73 - [QUERY TIME]: 527ms
[ERROR] [03/24/2017 17:22:55.951] [company-api-system-akka.actor.default-dispatcher-3] [akka.actor.ActorSystemImpl(company-api-system)] Error during processing of request: 'requirement failed'. Completing with 500 Internal Server Error response.
Run Code Online (Sandbox Code Playgroud)
这个错误信息是我唯一得到的.配置的相关部分:
akka {
loglevel = "DEBUG"
# edit -- tested with sl4jlogger with no change
#loggers = ["akka.event.slf4j.Slf4jLogger"]
#logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
parsing {
max-content-length = 800m
max-chunk-size = 100m
}
server {
server-header = akka-http/${akka.http.version}
idle-timeout = …Run Code Online (Sandbox Code Playgroud) 参考以下提到的实现:
http://doc.akka.io/docs/akka-http/10.0.5/scala/http/client-side/host-level.html
val poolClientFlow = Http().cachedHostConnectionPool[Promise[HttpResponse]]("akka.io")
val queue =
Source.queue[(HttpRequest, Promise[HttpResponse])](QueueSize, OverflowStrategy.dropNew)
.via(poolClientFlow)
.toMat(Sink.foreach({
case ((Success(resp), p)) => p.success(resp)
case ((Failure(e), p)) => p.failure(e)
}))(Keep.left)
.run()
Run Code Online (Sandbox Code Playgroud)
从多个线程提供队列http请求是否可以安全线程?如果不是,那么实施此类要求的最佳方式是什么?也许使用一个专门的演员?