哪些情况会导致Flux::flatMap同时收听多个源(0...无穷大)?
我在实验时发现,当上游向flatMap线程内发送信号时thread-upstream-1,并且存在NflatMap 将侦听的内部流,并且每个内部流都在不同的线程中发送信号:thread-inner-stream-ifor 1<=i<=N,而不是每个1<=i<=Nif thread-upstream-1 != thread-inner-stream-i,flatMap将同时侦听所有内部流溪流。
我认为这并不完全正确,我错过了一些其他场景。
java spring reactive-programming project-reactor reactive-streams
我有一个端点,如示例代码块中所示。流式传输时,我通过 调用异步方法streamHelper.getStreamSuspendCount()。我正在更改状态时停止此异步方法。但当浏览器关闭且会话终止时,我无法访问此异步方法。更改状态时,我将停止会话范围内的异步方法。但当浏览器关闭且会话终止时,我无法访问此异步方法。会话关闭时如何访问此范围?
@RequestMapping(value = "/stream/{columnId}/suspendCount", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Flux<Integer> suspendCount(@PathVariable String columnId) {
ColumnObject columnObject = streamHelper.findColumnObjectInListById(columnId);
return streamHelper.getStreamSuspendCount(columnObject);
}
getStreamSuspendCount(ColumnObject columnObject) {
...
//async flux
Flux<?> newFlux = beSubscribeFlow.get(i);
Disposable disposable = newFlux.subscribe();
beDisposeFlow.add(disposable); // my session scope variable. if change state, i will kill disposable (dispose()).
...
return Flux.fromStream(Stream.generate(() -> columnObject.getPendingObject().size())).distinctUntilChanged()
.doOnNext(i -> {
System.out.println(i);
}));
}
Run Code Online (Sandbox Code Playgroud) server-sent-events spring-boot project-reactor spring-session
我正在查看示例并阅读文档,在尝试以并行方式订阅 Flux 时发现了一些问题。
我有 3 个功能,如下所示。
private val log = LoggerFactory.getLogger("main")
private val sequence = Flux.just(1, 2)
fun a() {
sequence.subscribeOn(Schedulers.parallel()).subscribe { log.info("*** {}", it) }
sequence.subscribe { log.info(">>> {}", it) }
}
fun b() {
sequence.subscribe { log.info(">>> {}", it) }
}
fun c() {
sequence.subscribeOn(Schedulers.parallel()).subscribe { log.info("*** {}", it) }
}
Run Code Online (Sandbox Code Playgroud)
a()现在,当我单独运行每个方法时,我从函数和中获得了正确的输出b(),但从 的输出c()为空。这是预期的吗?是设计使然吗?如果是这样,为什么会发生这种情况?
使用 Flux 如何访问前一个元素?
我有一个外部事件流,它按顺序提供事件,该流的顺序是调度一个事件,然后立即调度另一个事件。然而,第二个事件的元数据位于第一个事件中。
请注意,事件数量并不总是偶数。
我想做的是将事件组合成事件流以供下游使用。
Flux#zip看起来很有希望,但这意味着返回外部事件类型的对象。
到目前为止我所拥有的是。
BinaryLogClient client = new BinaryLogClient(host, port, username, password);
Flux<Event> bridge = Flux.create(sink -> {
EventListener fluxListener = event -> {
sink.next(event);
};
client.registerEventListener(fluxListener);
});
bridge.subscribe(DemoApplication::printEvent);
bridge.map(new EventPairMemorizer());
public class EventPair {
private final Event previous;
private final Event current;
public EventPair(Event previous, Event current) {
this.previous = previous;
this.current = current;
}
/**
* @return `null` if no previous events.
*/
public Event getPrevious() {
return previous;
} …Run Code Online (Sandbox Code Playgroud) SpringBoot v2.5.1
有一个端点请求长时间运行的进程结果,并且它是以某种方式创建的
(为简单起见,它是Mono.fromCallable( ... long running ... ).
客户端发出请求并触发发布者执行工作,但几秒钟后客户端中止请求(即连接丢失)。并且该过程仍然继续利用资源来计算丢弃的结果。
通知 Project Reactor 事件循环有关应取消的不必要的正在进行的工作的机制是什么?
@RestController
class EndpointSpin {
@GetMapping("/spin")
Mono<Long> spin() {
AtomicLong counter = new AtomicLong(0);
Instant stopTime = Instant.now().plus(Duration.of(1, ChronoUnit.HOURS));
return Mono.fromCallable(() -> {
while (Instant.now().isBefore(stopTime)) {
counter.incrementAndGet();
if (counter.get() % 10_000_000 == 0) {
System.out.println(counter.get());
}
// of course this does not work
if (Thread.currentThread().isInterrupted()){
break;
}
}
return counter.get();
});
}
}
Run Code Online (Sandbox Code Playgroud) java reactive-programming cancellation project-reactor spring-webflux
我有一个如下所示的 Mono:
private void getNodeDetail() {
Mono<String> mono = webClient.get()
.uri("/alfresco/api/-default-/public/alfresco/versions/1/nodes/f37b52a8-de40-414b-b64d-a958137e89e2")
.retrieve().bodyToMono(String.class);
System.out.println(mono.subscribe());
System.out.println(mono.block());
}
Run Code Online (Sandbox Code Playgroud)
问题:第一个 sysout 向我展示了reactor.core.publisher.LambdaSubscriber@77114efe使用 block() 时的情况,它向我展示了我需要的内容(json 字符串)。但我想使用 Aysnc 方法。那么,如上所述,这是否意味着我的目标系统(在本例中为 Alfresco)不支持异步调用?subscribe()如果不是这种情况,如何使用,就像 一样以字符串格式在控制台上打印响应block()?
我需要处理来自Flux组内 (by id) 的事件,以便在单个组内按顺序处理每个事件,但并行处理组。据我所知,这可以通过groupBy和来实现concatMap。当我实现这个时,我的测试开始无限期地挂在一些大量的唯一 ID 上。我将问题与下面的代码隔离开来,并找到了代码开始挂起的特定数字 - 256。我绝对不明白为什么会发生这种情况以及从何256而来。
这是挂起的代码:
@ParameterizedTest
@ValueSource(ints = {250, 251, 252, 253, 254, 255, 256})
void freezeTest(int uniqueStringsCount) {
var scheduler = Schedulers
.newBoundedElastic(
1000,
1000,
"really-big-scheduler"
);
Flux.range(0, uniqueStringsCount)
.map(Object::toString)
.repeat()
// this represents "a lot of events"
.take(50_000)
.groupBy(x -> x)
// this gets the same results
// .parallel(400)
.parallel()
.flatMap(group ->
group.concatMap(e ->
// this represents a processing operation on each event
Mono.fromRunnable(() …Run Code Online (Sandbox Code Playgroud) 当上游发出 nullValue 时,我们可以使用 'Mono.defualtIfEmpty()' 或 'Mono.switchIfEmpty()' 来替换 null 值。
但switchIfEmpty()评价上游价值急切。因此我们使用 Mono.defer() 进行惰性求值。
'Mono.defualtIfEmpty()' 也是急于求值吗?像 switchIfEmpty() 一样?
如何更改 Mono.defualtIfEmpty() 来进行惰性评估?
reactive-programming project-reactor reactive spring-webflux
java ×4
spring ×2
spring-boot ×2
alfresco ×1
cancellation ×1
concurrency ×1
group-by ×1
kotlin ×1
reactive ×1