当Source有大量记录时,Akka流不会运行

Jef*_*man 0 java akka reactive-streams akka-stream

我正在尝试编写一个使用Akka Streams的简单介绍示例.我试图基本上创建一个流,它将一系列整数作为源并过滤掉所有非素数的整数,产生一个素数整数流作为其输出.

构造流的类相当简单; 为此,我有以下.

import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.javadsl.Flow;
import com.aparapi.Kernel;
import com.aparapi.Range;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public class PrimeStream {
    private final AverageRepository averageRepository = new AverageRepository();
    private final ActorSystem actorSystem;

    public PrimeStream(ActorSystem actorSystem) {
        this.actorSystem = actorSystem;
    }

    public Flow<Integer, Integer, NotUsed> filterPrimes() {
        return Flow.of(Integer.class).grouped(10000).mapConcat(PrimeKernel::filterPrimes).filter( v -> v != 0);
    }
}
Run Code Online (Sandbox Code Playgroud)

当我运行以下测试时,它工作正常.

private final ActorSystem actorSystem = ActorSystem.create("Sys");

@Test
public void testStreams() {
    Flow<Integer, Integer, NotUsed> filterStream = new PrimeStream(actorSystem).filterPrimes();
    Source<Integer, NotUsed> flow = Source.range(10000000, 10001000).via(filterStream);
    flow.runForeach(System.out::println, ActorMaterializer.create(actorSystem));
}
Run Code Online (Sandbox Code Playgroud)

但是,当我通过将测试中的线更改为以下来将范围增加x10倍时,它将不再起作用.

Source<Integer, NotUsed> flow = Source.range(10000000, 10010000).via(filterStream);
Run Code Online (Sandbox Code Playgroud)

现在,当测试运行时,不会抛出任何异常,也没有警告.它只是运行,然后退出,根本不向控制台显示任何文本.

为了更加确定问题不在我的素性测试本身,我在相同的范围内运行测试而不使用Akka Streams,它运行正常.以下代码运行没有问题.

@Test
public void testPlain() {
    List<Integer> in = IntStream.rangeClosed(10000000, 10010000).boxed().collect(Collectors.toList());
    List<Integer> out = PrimeKernel.filterPrimes(in);
    System.out.println(out);
}
Run Code Online (Sandbox Code Playgroud)

为了清楚起见,primality测试本身接受一个整数列表,如果它不是素数,则将列表中的任何元素设置为0.

正如@RamonJRomeroyVigil所建议的那样,如果我将mapConcat部分全部移除,但保留每个实例,它实际上打印出10,000个整数.但是,如果我将所有内容保持不变,只需将filterPrimes替换为只返回方法参数的方法而不触及它,那么它根本不会在屏幕上打印任何内容.我也尝试将println添加到开始的filterPrime中进行调试.每当它不打印包含调试语句的任何输出时.因此,根本没有尝试过调用filterPrimes.

Ank*_*kur 5

runForeach返回a CompletionStage,所以如果你想看到所有的数字都被打印出来,那么你必须等待CompletionStage测试函数返回,程序终止而不CompletionStage完成.

例:

flow.runForeach(System.out::println, ActorMaterializer.create(actorSystem)).toCompletableFuture().join();
Run Code Online (Sandbox Code Playgroud)