标签: reactive-streams

项目反应堆:flatMap之后的onErrorResume

Flux.just("a", "b")
        .flatMap(s -> s.equals("a") ? Mono.error(new RuntimeException() : Flux.just(s + "1", s + "2"))
        .onErrorResume(throwable -> Mono.empty())
        .subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)

你好!

在这里,我对两个元素进行了处理,然后通过flatMap将第一个暴露给异常,第二个暴露给另一个Flux。

随着onErrorResume我期望的输出

b1
b2
Run Code Online (Sandbox Code Playgroud)

但一无所获。有人可以解释为什么会发生吗?

谢谢。

java reactor project-reactor reactive-streams

0
推荐指数
2
解决办法
1913
查看次数

FlatMap 何时会同时监听多个源?

哪些情况会导致Flux::flatMap同时收听多个源(0...无穷大)?


我在实验时发现,当上游向flatMap线程内发送信号时thread-upstream-1,并且存在NflatMap 将侦听的内部流,并且每个内部流都在不同的线程中发送信号:thread-inner-stream-ifor 1<=i<=N,而不是每个1<=i<=Nif thread-upstream-1 != thread-inner-stream-iflatMap同时侦听所有内部流溪流。

我认为这并不完全正确,我错过了一些其他场景。

java spring reactive-programming project-reactor reactive-streams

0
推荐指数
1
解决办法
2241
查看次数

当Source有大量记录时,Akka流不会运行

我正在尝试编写一个使用Akka Streams的简单介绍示例.我试图基本上创建一个流,它将一系列整数作为源并过滤掉所有非素数的整数,产生一个素数整数流作为其输出.

构造流的类相当简单; 为此,我有以下.

import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.javadsl.Flow;
import com.aparapi.Kernel;
import com.aparapi.Range;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public class PrimeStream {
    private final AverageRepository averageRepository = new AverageRepository();
    private final ActorSystem actorSystem;

    public PrimeStream(ActorSystem actorSystem) {
        this.actorSystem = actorSystem;
    }

    public Flow<Integer, Integer, NotUsed> filterPrimes() {
        return Flow.of(Integer.class).grouped(10000).mapConcat(PrimeKernel::filterPrimes).filter( v -> v != 0);
    }
}
Run Code Online (Sandbox Code Playgroud)

当我运行以下测试时,它工作正常.

private final ActorSystem actorSystem = ActorSystem.create("Sys");

@Test
public void testStreams() {
    Flow<Integer, Integer, NotUsed> filterStream = new PrimeStream(actorSystem).filterPrimes();
    Source<Integer, NotUsed> flow …
Run Code Online (Sandbox Code Playgroud)

java akka reactive-streams akka-stream

0
推荐指数
1
解决办法
141
查看次数

Project Reactor - 订阅并行调度程序不起作用

我正在查看示例并阅读文档,在尝试以并行方式订阅 Flux 时发现了一些问题。

我有 3 个功能,如下所示。

private val log = LoggerFactory.getLogger("main")
private val sequence = Flux.just(1, 2)

fun a() {
    sequence.subscribeOn(Schedulers.parallel()).subscribe { log.info("*** {}", it) }
    sequence.subscribe { log.info(">>> {}", it) }
}

fun b() {
    sequence.subscribe { log.info(">>> {}", it) }
}

fun c() {
    sequence.subscribeOn(Schedulers.parallel()).subscribe { log.info("*** {}", it) }
}
Run Code Online (Sandbox Code Playgroud)

a()现在,当我单独运行每个方法时,我从函数和中获得了正确的输出b(),但从 的输出c()为空。这是预期的吗?是设计使然吗?如果是这样,为什么会发生这种情况?

kotlin project-reactor reactive-streams

0
推荐指数
1
解决办法
1131
查看次数