Abh*_*rar 5 java-8 playframework-2.0 akka-stream
我正在尝试使用PLay 2和Akka在webapp中实现块响应.但是,不是按块加载响应,而是所有响应都是一次响应.下面是我在控制器中创建块的代码:
/**
*
*/
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import akka.stream.javadsl.Source;
import akka.util.ByteString;
import org.pmw.tinylog.Logger;
import play.cache.CacheApi;
import play.cache.Cached;
import play.filters.csrf.AddCSRFToken;
import play.filters.csrf.CSRF;
import play.libs.Json;
import play.libs.concurrent.HttpExecutionContext;
import play.mvc.Controller;
import play.mvc.Http;
import play.mvc.Http.Cookie;
import play.mvc.Result;
import akka.NotUsed;
import akka.actor.Status;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Source;
import akka.util.ByteString;
/**
* @author Abhinabyte
*
*/
@Singleton
@AddCSRFToken
public class GetHandler extends Controller {
@Inject
private CacheApi cache;
@Inject
private HttpExecutionContext httpExecutionContext;
public CompletionStage<Result> index() {
return CompletableFuture.supplyAsync( () ->
Source.<ByteString>actorRef(256, OverflowStrategy.dropNew())
.mapMaterializedValue(sourceActor -> {
CompletableFuture.runAsync(() -> {
sourceActor.tell(ByteString.fromString("1"), null);
sourceActor.tell(ByteString.fromString("2"), null);
sourceActor.tell(ByteString.fromString("3"), null);
try {
Thread.sleep(3000);//intentional delay
} catch (InterruptedException e) {
e.printStackTrace();
}
sourceActor.tell(ByteString.fromString("444444444444444444444444444444444444444444444444444444444444444444444444"), null);
sourceActor.tell(new Status.Success(NotUsed.getInstance()), null);
});
return sourceActor;
})
).thenApplyAsync( chunks -> ok().chunked(chunks).as("text/html"));
}
}
Run Code Online (Sandbox Code Playgroud)
以下是application.conf中的Akka线程池配置:
akka {
jvm-exit-on-fatal-error = on
actor {
default-dispatcher {
fork-join-executor {
parallelism-factor = 1.0
parallelism-max = 64
task-peeking-mode = LIFO
}
}
}
}
play.server.netty {
eventLoopThreads = 0
maxInitialLineLength = 4096
log.wire = false
transport = "native"
}
Run Code Online (Sandbox Code Playgroud)
正如你在发送最后一个块之前所看到的那样,我故意推迟响应时间.从逻辑上讲,之前的所有分块数据都应该在它之前传递.
但是,在我的情况下,整堆数据正在加载.我已经在所有浏览器中测试过(甚至尝试过CURL).
我在这里缺少什么?
阻塞mapMaterializedValue会这样做,因为它在 Akkadefault-dispatcher线程中运行,从而在持续时间内阻止消息路由(有关详细信息,请参阅此答案)。您希望异步分派缓慢、阻塞的代码,并使用参与者引用来发布消息。如果您将来运行它,您的示例将达到您的预期:
public CompletionStage<Result> test() {
return CompletableFuture.supplyAsync( () ->
Source.<ByteString>actorRef(256, OverflowStrategy.dropNew())
.mapMaterializedValue(sourceActor -> {
CompletableFuture.runAsync(() -> {
for (int i = 0; i < 20; i++) {
sourceActor.tell(ByteString.fromString(String.valueOf(i) + "<br/>\n"), null);
try {
Thread.sleep(500);//intentional delay
} catch (InterruptedException e) {
e.printStackTrace();
}
}
sourceActor.tell(new Status.Success(NotUsed.getInstance()), null);
});
return sourceActor;
})
).thenApplyAsync( chunks -> ok().chunked(chunks).as("text/html"));
}
Run Code Online (Sandbox Code Playgroud)