目的是从数据库流式传输数据,对此数据块执行一些计算(此计算返回某些案例类的Future),并将此数据作为分块响应发送给用户.目前,我能够流式传输数据并发送响应,而无需执行任何计算.但是,我无法执行此计算,然后流式传输结果.
这是我实施的路线.
def streamingDB1 =
path("streaming-db1") {
get {
val src = Source.fromPublisher(db.stream(getRds))
complete(src)
}
}
Run Code Online (Sandbox Code Playgroud)
函数getRds返回映射到case类的表的行(使用slick).现在考虑函数compute,它将每行作为输入并返回另一个案例类的Future.就像是
def compute(x: Tweet) : Future[TweetNew] = ?
Run Code Online (Sandbox Code Playgroud)
如何在变量src上实现此函数,并将此计算的分块响应(作为流)发送给用户.
我目前有一个指令,我用来保护Akka HTTP应用程序中的资源,如下所示:
def authenticate: Directive1[Login] =
optionalHeaderValueByName("Authorization") flatMap {
val accessToken = authz.split(' ').last
case Some(authz) =>
LoggedInUser findByAccessToken accessToken match {
case Some(user) => provide(user)
case None => reject(AuthorizationFailedRejection)
}
case None => reject(AuthorizationFailedRejection)
}
Run Code Online (Sandbox Code Playgroud)
在哪里LoggedInUser.findByAccessToken()对数据库进行阻塞查询,我想把它转换为异步ask到一个可以提供相同数据的actor,我可以将ActorRefas作为参数传递给指令但我无法解决如何处理问题Future返回.
Directive1Akka HTTP附带的所有示例似乎都没有这样做(至少我可以找到),尽管有一些指令返回的例子Route.
我甚至想做什么?是一种可能的方法来创建一个StandardRoute具有用户凭据字段的子类并以某种方式返回?
寻找有关如何使用akka HTTP进行身份验证的良好解释.给出一条看起来像的路线
val route =
path("account") {
authenticateBasic(realm = "some realm", myAuthenticator) { user =>
get {
encodeResponseWith(Deflate) {
complete {
//do something here
}
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
文档概述了一种方法,但是省略了执行实际认证的相关部分......
// backend entry points
def myAuthenticator: Authenticator[User] = ???
Run Code Online (Sandbox Code Playgroud)
我在哪里可以找到这种验证器的示例实现?我已经有了用于在给定用户名和密码的情况下对用户进行身份验证的逻辑,但我无法弄清楚如何从HTTP请求(或RequestContext)获取用户名/密码(或包含两者的令牌).
我刚刚开始测试Akka HTTP请求级客户端API(基于未来).我一直在努力弄清楚的一件事是如何为此编写单元测试.有没有办法模拟响应并在不必实际执行HTTP请求的情况下完成未来?
我正在查看API和testkit包,试图看看我如何使用它,只是在文档中找到它实际上说的:
akka-http-testkit用于验证服务器端服务实现的测试工具和实用程序集
我正在思考一些事情TestServer(有点像TestSourceAkka Streams),并使用服务器端路由DSL来创建预期的响应,并以某种方式将其挂钩到Http对象上.
以下是我想要测试的函数的简化示例:
object S3Bucket {
def sampleTextFile(uri: Uri)(
implicit akkaSystem: ActorSystem,
akkaMaterializer: ActorMaterializer
): Future[String] = {
val request = Http().singleRequest(HttpRequest(uri = uri))
request.map { response => Unmarshal(response.entity).to[String] }
}
}
Run Code Online (Sandbox Code Playgroud) 我正在尝试将请求有效负载解组为字符串,但由于某种原因它失败了.我的代码:
path("mypath") {
post {
decodeRequest {
entity(as[String]) {jsonStr => //could not find implicit value for...FromRequestUnmarshaller[String]
complete {
val json: JsObject = Json.parse(jsonStr).as[JsObject]
val jsObjectFuture: Future[JsObject] = MyDatabase.addListItem(json)
jsObjectFuture.map(_.as[String])
}
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
例如,在这个SO线程中,似乎默认情况下这个隐式应该是可用的.但也许这在akka-http中有所不同?
我尝试导入akka.http.scaladsl.unmarshalling.PredefinedFromEntityUnmarshallers哪一个stringUnmarshaller但它没有帮助.也许因为这FromEntityUnmarshaller[String]不会返回类型FromRequestUnmarshaller[String].还有一个字符串unmarshaller,spray.httpx.unmarshalling.BasicUnmarshallers但这也没有帮助akka.http.scaladsl.unmarshalling.PredefinedFromStringUnmarshallers
我如何解组(和编组)成一个字符串?
(Bonus:如何直接在JsObject中解组(播放json).但也只是字符串,因为我对它为什么不起作用感兴趣,并且它可能对其他情况有用).
使用1.0-RC3
谢谢.
我正在尝试Akka-http,希望有人可以解释以下问题:
如何根据请求中的accept:标头创建不同的路由?例如,我想要一个代码路径来处理"json",一个代码来处理"xml"请求(如果缺少头,则默认为"json")
如果我不希望推断出contentType,我该如何指定它?例如,在下面的代码中,我尝试通过compactPrint()运行json,但这会将其更改为字符串,因此为"text/plain".我想覆盖它并告诉客户端它仍然是json.
我的代码是这样的;
...
path("api") {
get {
complete {
getStuff.map[ToResponseMarshallable] {
case Right(r) if r.isEmpty => List[String]().toJson.compactPrint
case Right(r) => r.toJson.compactPrint
case Left(e) => BadRequest -> e
}
}
}
}
...
Run Code Online (Sandbox Code Playgroud)
这种情况下的响应是text/plain,因为compactPrint创建了一个字符串.批评非常欢迎.;)
我正在使用akka流,我有一段我的图形,我需要有条件地跳过,因为流不能处理某些值.具体来说,我有一个获取字符串并发出http请求的流,但是当字符串为空时服务器无法处理这种情况.但我需要返回一个空字符串.有没有办法做到这一点,而不必通过http请求知道它会失败?我基本上有这个:
val source = Source("1", "2", "", "3", "4")
val httpRequest: Flow[String, HttpRequest, _]
val httpResponse: Flow[HttpResponse, String, _]
val flow = source.via(httpRequest).via(httpResponse)
Run Code Online (Sandbox Code Playgroud)
我唯一能想到的就是在我的httpResponse流中捕获400错误并返回默认值.但我希望能够避免因为我知道事先会失败的请求而命中服务器的开销.
我正在尝试使用Akka HTTP 2.0-M2编写批量数据上传工具.但我正面临着akka.stream.OverflowStrategy$Fail$BufferOverflowException: Exceeded configured max-open-requests value of [32] error.
我试图隔离一个问题,这里的示例代码也失败了:
public class TestMaxRequests {
private static final class Router extends HttpApp {
@Override
public Route createRoute() {
return route(
path("test").route(
get(handleWith(ctx -> ctx.complete("OK")))
)
);
}
}
public static void main(String[] args) {
ActorSystem actorSystem = ActorSystem.create();
Materializer materializer = ActorMaterializer.create(actorSystem);
Router router = new Router();
router.bindRoute("127.0.0.1", 8082, actorSystem);
LoggingAdapter log = Logging.getLogger(actorSystem, new Object());
for (int i = 0; i < 100; i++) {
final int reqNum …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用Akka HTTP在我的应用程序中实现文件上载功能.我正在使用akka-stream版本2.4.4.
这是代码(从akka-doc修改)
path("fileupload") {
post {
extractRequestContext {
ctx => {
implicit val materializer = ctx.materializer
implicit val ec = ctx.executionContext
fileUpload("fileUpload") {
case (metadata, byteSource) =>
val location = FileUtil.getUploadPath(metadata)
val updatedFileName = metadata.fileName.replaceAll(" ", "").replaceAll("\"", "")
val uniqFileName = uniqueFileId.concat(updatedFileName)
val fullPath = location + File.separator + uniqFileName
val writer = new FileOutputStream(fullPath)
val bufferedWriter = new BufferedOutputStream(writer)
val result = byteSource.map(s => {
bufferedWriter.write(s.toArray)
}).runWith(Sink.ignore)
val result1 = byteSource.runWith(Sink.foreach(s=>bufferedWriter.write(s.toArray)))
Await.result(result1, …Run Code Online (Sandbox Code Playgroud) akka-http ×10
scala ×7
akka ×6
akka-stream ×4
akka-testkit ×1
file-upload ×1
java ×1
json ×1
netty ×1
slick ×1