Flux.just("a", "b")
.flatMap(s -> s.equals("a") ? Mono.error(new RuntimeException() : Flux.just(s + "1", s + "2"))
.onErrorResume(throwable -> Mono.empty())
.subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)
你好!
在这里,我对两个元素进行了处理,然后通过flatMap将第一个暴露给异常,第二个暴露给另一个Flux。
随着onErrorResume我期望的输出
b1
b2
Run Code Online (Sandbox Code Playgroud)
但一无所获。有人可以解释为什么会发生吗?
谢谢。
哪些情况会导致Flux::flatMap同时收听多个源(0...无穷大)?
我在实验时发现,当上游向flatMap线程内发送信号时thread-upstream-1,并且存在NflatMap 将侦听的内部流,并且每个内部流都在不同的线程中发送信号:thread-inner-stream-ifor 1<=i<=N,而不是每个1<=i<=Nif thread-upstream-1 != thread-inner-stream-i,flatMap将同时侦听所有内部流溪流。
我认为这并不完全正确,我错过了一些其他场景。
java spring reactive-programming project-reactor reactive-streams
我正在尝试编写一个使用Akka Streams的简单介绍示例.我试图基本上创建一个流,它将一系列整数作为源并过滤掉所有非素数的整数,产生一个素数整数流作为其输出.
构造流的类相当简单; 为此,我有以下.
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.javadsl.Flow;
import com.aparapi.Kernel;
import com.aparapi.Range;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
public class PrimeStream {
private final AverageRepository averageRepository = new AverageRepository();
private final ActorSystem actorSystem;
public PrimeStream(ActorSystem actorSystem) {
this.actorSystem = actorSystem;
}
public Flow<Integer, Integer, NotUsed> filterPrimes() {
return Flow.of(Integer.class).grouped(10000).mapConcat(PrimeKernel::filterPrimes).filter( v -> v != 0);
}
}
Run Code Online (Sandbox Code Playgroud)
当我运行以下测试时,它工作正常.
private final ActorSystem actorSystem = ActorSystem.create("Sys");
@Test
public void testStreams() {
Flow<Integer, Integer, NotUsed> filterStream = new PrimeStream(actorSystem).filterPrimes();
Source<Integer, NotUsed> flow …Run Code Online (Sandbox Code Playgroud) 我正在查看示例并阅读文档,在尝试以并行方式订阅 Flux 时发现了一些问题。
我有 3 个功能,如下所示。
private val log = LoggerFactory.getLogger("main")
private val sequence = Flux.just(1, 2)
fun a() {
sequence.subscribeOn(Schedulers.parallel()).subscribe { log.info("*** {}", it) }
sequence.subscribe { log.info(">>> {}", it) }
}
fun b() {
sequence.subscribe { log.info(">>> {}", it) }
}
fun c() {
sequence.subscribeOn(Schedulers.parallel()).subscribe { log.info("*** {}", it) }
}
Run Code Online (Sandbox Code Playgroud)
a()现在,当我单独运行每个方法时,我从函数和中获得了正确的输出b(),但从 的输出c()为空。这是预期的吗?是设计使然吗?如果是这样,为什么会发生这种情况?