标签: spring-reactor

如何计算 Flux 中的项目数,如果计数大于 X,则返回错误,否则继续使用 Pipeline

我是 Spring 项目 Reactor 的新手,我不完全确定如何执行某些操作:

我有我的管道管道返回记录。都好。

但我想计算这些记录,然后做一些事情(比如 if else),如果返回的记录 > X 则出错,否则继续。

知道 Count 返回 a Mono<Long>,那么之后我会丢失记录,我该怎么办?

我在想:

以某种方式使用flatMap并执行此平面图内的某些操作。不知何故,我发现 Flux 中有一种reduce方法可能会有所帮助。

关键是,我不知道如何继续。

java spring reactive-programming project-reactor spring-reactor

8
推荐指数
1
解决办法
1万
查看次数

R2DBC-Oracle数据库支持

我在搜索反应性关系数据库驱动程序时找到了R2DBC,但找不到任何Oracle DB驱动程序。是否有人提供是否提供支持的信息?

oracle spring-data-r2dbc r2dbc spring-reactor

5
推荐指数
3
解决办法
290
查看次数

如何在非反应式 Spring EventListener 和反应式 Flux 之间架起桥梁

通过调用Flux.push和使用pushlambda 表达式中的接收器直接创建 Flux与使用由 a 提供的接收器有DirectProcessor什么区别?

在 Flux 只发出几个事件的最小示例中,我可以做

Flux.<String>push(emitter -> {
   emitter.next("One");
   emitter.next("Two");
   emitter.complete();
 });
Run Code Online (Sandbox Code Playgroud)

与使用 DirectProcessor

var emitter = DirectProcessor.<String>create().sink();
emitter.next("One");
emitter.next("Two");
emitter.complete();
Run Code Online (Sandbox Code Playgroud)

澄清一下:我知道我可以Flux.just在这里使用,但我的用例实际上是在 Spring 的@EventListeners 和 Spring WebFlux之间建立一座桥梁,我想为每个传入的特定资源的 SSE 请求创建一个 Flux,然后将事件发布到这个通量。

有人能告诉我,这两种方法是否都有效?当然,肯定有一些不同。特别是,Reactor Reference Guide部分关于DirectProcessor状态:

另一方面,它具有不处理背压的限制。因此,如果您通过 DirectProcessor 推送 N 个元素但至少其中一个订阅者的请求少于 N,则 DirectProcessor 会向其订阅者发出 IllegalStateException 信号。

这意味着什么?

[编辑:]在我使用的问题的早期版本中,Flux.generate()而不是Flux.push(),这显然是错误的,因为 generate 最多可以创建一个事件。

[编辑 2:] @123 向我询问了我想要实现的目标的完整示例。忍受我,这是一个 SO 问题的相当数量的代码:

我实际尝试做的完整示例

我想在(非反应式)Spring 域事件侦听器和反应式 Flux 之间建立一座桥梁,然后我可以在 …

spring spring-webflux spring-reactive spring-reactor

5
推荐指数
2
解决办法
1861
查看次数

Spring Reactor Webflux 调度程序并行性

对于完全非阻塞的端到端反应式调用,是否建议显式调用publishOn或subscribeOn来切换调度程序?对于 cpu 消耗或非消耗任务,始终使用并行通量来优化性能是否有利?

spring project-reactor reactor-netty spring-webflux spring-reactor

5
推荐指数
1
解决办法
5636
查看次数

WebFlux - Reactor Http Epoll 线程

我正在使用 Spring webflux。我向其余端点发送数百个并发请求。当我检查时,只有 4 个线程被共享来处理所有负载。

这是正常的吗?有没有弹簧属性可以增加这个计数?

  • reactor-http-epoll-1
  • reactor-http-epoll-2
  • reactor-http-epoll-3
  • reactor-http-epoll-4

我确实知道我可以使用反应堆调度程序来卸载阻塞工作。我的问题更多的是 - 这 4 个线程是什么以及我们在哪里有这个配置?

project-reactor spring-webflux spring-reactor

5
推荐指数
1
解决办法
1万
查看次数

为什么 ConnectableFlux.connect() 会阻塞?

我是 Spring Reactor 的新手。我一直在尝试了解ConnectableFlux类的工作原理。我已经阅读了文档并看到了在线发布的示例,但我遇到了一个问题。

有人能告诉我为什么connect()方法会阻塞吗?我在文档中没有看到任何内容表明它应该阻止......特别是因为它返回一个 Disposable 供以后使用。鉴于下面的示例代码,我从未跳过 connect() 方法。

我试图基本上模拟我过去多次使用过的旧式监听器接口范例。我想学习如何使用反应流重新创建服务类和侦听器架构。我有一个简单的服务类,它有一个名为“ addUpdateListener(Listener l) ”的方法,然后当我的服务类“ doStuff() ”方法时,它会触发一些事件传递给任何侦听器。

我应该说我将编写一个API供其他人使用,所以当我说Service类时我并不是指Spring术语中的@Service。它将是一个普通的 java 单例类。

我只是将 Spring Reactor 用于反应流。我也在研究 RxJava ..但想看看 Spring Reactor Core 是否可以工作。

我从下面的一个测试类开始只是为了理解库语法,然后陷入了阻塞问题。

我想我正在寻找的内容在这里描述:多个订阅者

更新:通过调试器运行我的代码,ConnectableFlux 连接方法内的代码永远不会返回。它挂在内部连接方法上并且永远不会从该方法返回。

reactor.core.publisher.ConnectableFlux

public final Disposable connect() {
        Disposable[] out = new Disposable[]{null};
        this.connect((r) -> {
            out[0] = r;
        });
        return out[0];
    }
Run Code Online (Sandbox Code Playgroud)

任何帮助都会很棒!

这也是我的 Maven pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>SpringReactorTest</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties> …
Run Code Online (Sandbox Code Playgroud)

spring-reactor

4
推荐指数
1
解决办法
1635
查看次数

2
推荐指数
1
解决办法
1176
查看次数