标签: project-reactor

FlatMap 何时会同时监听多个源?

哪些情况会导致Flux::flatMap同时收听多个源(0...无穷大)?


我在实验时发现,当上游向flatMap线程内发送信号时thread-upstream-1,并且存在NflatMap 将侦听的内部流,并且每个内部流都在不同的线程中发送信号:thread-inner-stream-ifor 1<=i<=N,而不是每个1<=i<=Nif thread-upstream-1 != thread-inner-stream-iflatMap同时侦听所有内部流溪流。

我认为这并不完全正确,我错过了一些其他场景。

java spring reactive-programming project-reactor reactive-streams

0
推荐指数
1
解决办法
2241
查看次数

如何处理sse连接关闭?

我有一个端点,如示例代码块中所示。流式传输时,我通过 调用异步方法streamHelper.getStreamSuspendCount()。我正在更改状态时停止此异步方法。但当浏览器关闭且会话终止时,我无法访问此异步方法。更改状态时,我将停止会话范围内的异步方法。但当浏览器关闭且会话终止时,我无法访问此异步方法。会话关闭时如何访问此范围?

@RequestMapping(value = "/stream/{columnId}/suspendCount", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Flux<Integer> suspendCount(@PathVariable String columnId) {
    ColumnObject columnObject = streamHelper.findColumnObjectInListById(columnId);
    return streamHelper.getStreamSuspendCount(columnObject);
}


getStreamSuspendCount(ColumnObject columnObject) {
   ...
   //async flux
   Flux<?> newFlux = beSubscribeFlow.get(i);
   Disposable disposable = newFlux.subscribe();
   beDisposeFlow.add(disposable); // my session scope variable. if change state, i will kill disposable (dispose()).
   ...
   return Flux.fromStream(Stream.generate(() -> columnObject.getPendingObject().size())).distinctUntilChanged()
                    .doOnNext(i -> {
                        System.out.println(i);
                    }));
}
Run Code Online (Sandbox Code Playgroud)

server-sent-events spring-boot project-reactor spring-session

0
推荐指数
1
解决办法
2759
查看次数

Project Reactor - 订阅并行调度程序不起作用

我正在查看示例并阅读文档,在尝试以并行方式订阅 Flux 时发现了一些问题。

我有 3 个功能,如下所示。

private val log = LoggerFactory.getLogger("main")
private val sequence = Flux.just(1, 2)

fun a() {
    sequence.subscribeOn(Schedulers.parallel()).subscribe { log.info("*** {}", it) }
    sequence.subscribe { log.info(">>> {}", it) }
}

fun b() {
    sequence.subscribe { log.info(">>> {}", it) }
}

fun c() {
    sequence.subscribeOn(Schedulers.parallel()).subscribe { log.info("*** {}", it) }
}
Run Code Online (Sandbox Code Playgroud)

a()现在,当我单独运行每个方法时,我从函数和中获得了正确的输出b(),但从 的输出c()为空。这是预期的吗?是设计使然吗?如果是这样,为什么会发生这种情况?

kotlin project-reactor reactive-streams

0
推荐指数
1
解决办法
1131
查看次数

将 Flux 条目与 previous 和 map 相结合

问题

使用 Flux 如何访问前一个元素?

背景

我有一个外部事件流,它按顺序提供事件,该流的顺序是调度一个事件,然后立即调度另一个事件。然而,第二个事件的元数据位于第一个事件中。

请注意,事件数量并不总是偶数。

我想做的是将事件组合成事件流以供下游使用。

Flux#zip看起来很有希望,但这意味着返回外部事件类型的对象。

初始代码

到目前为止我所拥有的是。

    BinaryLogClient client = new BinaryLogClient(host, port, username, password);
    Flux<Event> bridge = Flux.create(sink -> {
        EventListener fluxListener = event -> {
            sink.next(event);
        };

        client.registerEventListener(fluxListener);
    });

    bridge.subscribe(DemoApplication::printEvent);
    bridge.map(new EventPairMemorizer());


public class EventPair  {
    private final Event previous;
    private final Event current;

    public EventPair(Event previous, Event current) {
        this.previous = previous;
        this.current = current;
    }

    /**
     * @return `null` if no previous events.
     */
    public Event getPrevious() {
        return previous;
    } …
Run Code Online (Sandbox Code Playgroud)

java project-reactor

0
推荐指数
1
解决办法
2024
查看次数

当客户端中止请求时,WebFlux 如何停止发布者?

SpringBoot v2.5.1

有一个端点请求长时间运行的进程结果,并且它是以某种方式创建的
(为简单起见,它是Mono.fromCallable( ... long running ... ).

客户端发出请求并触发发布者执行工作,但几秒钟后客户端中止请求(即连接丢失)。并且该过程仍然继续利用资源来计算丢弃的结果。

通知 Project Reactor 事件循环有关应取消的不必要的正在进行的工作的机制是什么?

@RestController 
class EndpointSpin {
 
  @GetMapping("/spin")
  Mono<Long> spin() {
    AtomicLong counter = new AtomicLong(0);
    Instant stopTime = Instant.now().plus(Duration.of(1, ChronoUnit.HOURS));

    return Mono.fromCallable(() -> {

      while (Instant.now().isBefore(stopTime)) {
        counter.incrementAndGet();

        if (counter.get() % 10_000_000 == 0) {
          System.out.println(counter.get());
        }

        // of course this does not work
        if (Thread.currentThread().isInterrupted()){
           break;
        }
      }

      return counter.get();
    });
  }
}
Run Code Online (Sandbox Code Playgroud)

java reactive-programming cancellation project-reactor spring-webflux

0
推荐指数
1
解决办法
1388
查看次数

Spring WebFlux - 如何使用 WebClient 将响应打印为字符串而不是对象

我有一个如下所示的 Mono:

    private void getNodeDetail() {
        Mono<String> mono = webClient.get()
                .uri("/alfresco/api/-default-/public/alfresco/versions/1/nodes/f37b52a8-de40-414b-b64d-a958137e89e2")
                .retrieve().bodyToMono(String.class);

        System.out.println(mono.subscribe());
        System.out.println(mono.block());
    }
Run Code Online (Sandbox Code Playgroud)

问题:第一个 sysout 向我展示了reactor.core.publisher.LambdaSubscriber@77114efe使用 block() 时的情况,它向我展示了我需要的内容(json 字符串)。但我想使用 Aysnc 方法。那么,如上所述,这是否意味着我的目标系统(在本例中为 Alfresco)不支持异步调用?subscribe()如果不是这种情况,如何使用,就像 一样以字符串格式在控制台上打印响应block()

spring alfresco spring-boot project-reactor spring-webflux

0
推荐指数
1
解决办法
2897
查看次数

为什么此 Flux 的处理会无限期地挂在大小 256 上?

我需要处理来自Flux组内 (by id) 的事件,以便在单个组内按顺序处理每个事件,但并行处理组。据我所知,这可以通过groupBy和来实现concatMap。当我实现这个时,我的测试开始无限期地挂在一些大量的唯一 ID 上。我将问题与下面的代码隔离开来,并找到了代码开始挂起的特定数字 - 256。我绝对不明白为什么会发生这种情况以及从何256而来。

这是挂起的代码:

@ParameterizedTest
@ValueSource(ints = {250, 251, 252, 253, 254, 255, 256})
void freezeTest(int uniqueStringsCount) {
  var scheduler = Schedulers
      .newBoundedElastic(
          1000,
          1000,
          "really-big-scheduler"
      );
  Flux.range(0, uniqueStringsCount)
      .map(Object::toString)
      .repeat()
      // this represents "a lot of events"
      .take(50_000)
      .groupBy(x -> x)
      // this gets the same results
      // .parallel(400)
      .parallel()
      .flatMap(group ->
          group.concatMap(e ->

              // this represents a processing operation on each event
              Mono.fromRunnable(() …
Run Code Online (Sandbox Code Playgroud)

java concurrency group-by project-reactor

0
推荐指数
1
解决办法
804
查看次数

Mono.defualtEmpty() 与 Mono.switchIfEmpty()

当上游发出 nullValue 时,我们可以使用 'Mono.defualtIfEmpty()' 或 'Mono.switchIfEmpty()' 来替换 null 值。

switchIfEmpty()评价上游价值急切。因此我们使用 Mono.defer() 进行惰性求值。

  1. 'Mono.defualtIfEmpty()' 也是急于求值吗?像 switchIfEmpty() 一样?

  2. 如何更改 Mono.defualtIfEmpty() 来进行惰性评估?

reactive-programming project-reactor reactive spring-webflux

0
推荐指数
1
解决办法
4765
查看次数