为什么Play 2.5 Akka chunk响应一次性加载

Abh*_*rar 5 java-8 playframework-2.0 akka-stream

我正在尝试使用PLay 2和Akka在webapp中实现块响应.但是,不是按块加载响应,而是所有响应都是一次响应.下面是我在控制器中创建块的代码:

/**
 * 
 */    
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import com.google.inject.Inject;
import com.google.inject.Singleton;
import akka.stream.javadsl.Source;
import akka.util.ByteString;
import org.pmw.tinylog.Logger;
import play.cache.CacheApi;
import play.cache.Cached;
import play.filters.csrf.AddCSRFToken;
import play.filters.csrf.CSRF;
import play.libs.Json;
import play.libs.concurrent.HttpExecutionContext;
import play.mvc.Controller;
import play.mvc.Http;
import play.mvc.Http.Cookie;
import play.mvc.Result;

import akka.NotUsed;
import akka.actor.Status;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Source;
import akka.util.ByteString;

/**
 * @author Abhinabyte
 *
 */
@Singleton
@AddCSRFToken
public class GetHandler extends Controller {

    @Inject
    private CacheApi cache;

    @Inject
    private HttpExecutionContext httpExecutionContext;

    public CompletionStage<Result> index() {

return CompletableFuture.supplyAsync( () ->
            Source.<ByteString>actorRef(256, OverflowStrategy.dropNew())
                    .mapMaterializedValue(sourceActor -> {

                        CompletableFuture.runAsync(() -> {
                            sourceActor.tell(ByteString.fromString("1"), null);
                            sourceActor.tell(ByteString.fromString("2"), null);
                            sourceActor.tell(ByteString.fromString("3"), null);
                            try {
                                Thread.sleep(3000);//intentional delay
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            sourceActor.tell(ByteString.fromString("444444444444444444444444444444444444444444444444444444444444444444444444"), null);
                            sourceActor.tell(new Status.Success(NotUsed.getInstance()), null);
                        });

                        return sourceActor;
                    })
    ).thenApplyAsync( chunks -> ok().chunked(chunks).as("text/html"));  

    }

}
Run Code Online (Sandbox Code Playgroud)


以下是application.conf中的Akka线程池配置:

akka {
  jvm-exit-on-fatal-error = on
  actor {
    default-dispatcher {
      fork-join-executor {
        parallelism-factor = 1.0
        parallelism-max = 64
        task-peeking-mode = LIFO
      }
    }
  }
}

play.server.netty {
  eventLoopThreads = 0
  maxInitialLineLength = 4096
  log.wire = false
  transport = "native"
}
Run Code Online (Sandbox Code Playgroud)

正如你在发送最后一个块之前所看到的那样,我故意推迟响应时间.从逻辑上讲,之前的所有分块数据都应该在它之前传递.
但是,在我的情况下,整堆数据正在加载.我已经在所有浏览器中测试过(甚至尝试过CURL).
我在这里缺少什么?

Mik*_*ame 1

阻塞mapMaterializedValue会这样做,因为它在 Akkadefault-dispatcher线程中运行,从而在持续时间内阻止消息路由(有关详细信息,请参阅此答案)。您希望异步分派缓慢、阻塞的代码,并使用参与者引用来发布消息。如果您将来运行它,您的示例将达到您的预期:

public CompletionStage<Result> test() {
    return CompletableFuture.supplyAsync( () ->
            Source.<ByteString>actorRef(256, OverflowStrategy.dropNew())
                    .mapMaterializedValue(sourceActor -> {

                        CompletableFuture.runAsync(() -> {

                            for (int i = 0; i < 20; i++) {
                                sourceActor.tell(ByteString.fromString(String.valueOf(i) + "<br/>\n"), null);
                                try {
                                    Thread.sleep(500);//intentional delay
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                            }
                            sourceActor.tell(new Status.Success(NotUsed.getInstance()), null);
                        });

                        return sourceActor;
                    })
    ).thenApplyAsync( chunks -> ok().chunked(chunks).as("text/html"));
}
Run Code Online (Sandbox Code Playgroud)