标签: spring-webflux

Spring webflux将实体类转换为Mono DTO对象

首先,我是 Spring Webflux 的新手,并尝试在设置反应式 Spring Boot 项目时进行 POC。我有一个用例,我需要将检索到的实体类(PartyDTO)转换为 Mono 对象(Person :是没有构造函数的第三方业务对象,我无法修改它)。我用谷歌搜索但无法找到与我的用例匹配的答案。

第三方对象:

  public class Person {
        // no constructors 
        private Integer custId;
        private String fullname;
        private LocalDate date;
        //
        getters and setters
    }
Run Code Online (Sandbox Code Playgroud)

我的课程如下:

@Table("party")
public class PartyDTO {

    @Id
    private Integer party_id;
    private String name;
    private LocalDate start_date;
}
Run Code Online (Sandbox Code Playgroud)

调用我的存储库的服务类。

 @Service
     public class ServiceImpl{

     @Override
        public Mono<Person> getParty(String partyId) {
            return 
    partyRepository.findById(Integer.parseInt(partyId)).flatMap(//mapper to convert PartyDTO to Person goes here);
        }
}
Run Code Online (Sandbox Code Playgroud)

我尝试将平面地图与我的自定义映射器一起使用,如上所示,但它不起作用。有人可以建议我如何以非阻塞方式实现这一点(如果第三方 bean 映射器支持非阻塞方法,它也很好)

java spring spring-boot spring-webflux spring-webclient

1
推荐指数
1
解决办法
9713
查看次数

保留反应流中的上下文

有人可以帮助我理解上下文是如何在反应流中传递的。例如,请参见下面的代码:

Flux<Integer> expectedFluxWithContext = Flux.just(1, 2, 3, 4)
        .flatMap(item -> Mono.just(item).contextWrite(Context.of("traceId", item)))
        .doOnEach(signal -> System.out.println(signal.getContextView()));
Run Code Online (Sandbox Code Playgroud)

当我运行上面的代码时,我得到了我所期望的正确数据,但在doOnEach运算符中上下文是空的。

任何人都可以帮助我了解如何在流中共享上下文以及我可以做出哪些更改来完成这项工作。

java reactor reactive-programming project-reactor spring-webflux

1
推荐指数
1
解决办法
3296
查看次数

Java Rest 客户端 bodyToMono 通用

我的应用程序有一些针对不同休息端点的服务。逻辑总是相同的,所以我想使用继承和泛型来避免代码重复。但一行(bodyToMono(E[].class))不起作用。我不明白还有其他选择(也许是最佳实践方式)吗?

家长班级:

@Configuration
@Service
public abstract class AbstractEntitiesRestService<E>{

   protected abstract String getBaseUrl();

   @Autowired
   @Qualifier("webClient")
   protected WebClient WebClient;

    @Override
    public E[] getObjectsFromCustomQueryImpl(CustomQuery query) {
        return jtWebClient.get()
                .uri(getBaseUrl())
                .retrieve()
                .bodyToMono(E[].class) <---- Error!
                .block();
    }
}
Run Code Online (Sandbox Code Playgroud)

儿童班:

@Configuration
@Service
public class UserService extends AbstractEntitiesRestService<User> {

    @Value("${endpoint}")
    protected String baseUrl;

    @Override
    protected  String getBaseUrl(){
        return baseUrl;
    }
    
    ...

}
Run Code Online (Sandbox Code Playgroud)

java generics webclient spring-webflux

1
推荐指数
1
解决办法
4259
查看次数

如何将 Flux 的每个元素与需要元素属性的另一个 Flux 组合起来

我有一个关于 Spring WebFlux 和 Reactor 的问题。我正在尝试编写一个简单的场景,其中在 GET 端点中,我返回代表实体的 DTO Flux,并且每个实体都有代表另一个实体的其他 DTO 的集合。这里请遵循详细信息。

我有两个实体,Person 和 Song,定义如下:

@Data
public class Person {
    @Id
    private Long id;
    private String firstName;
    private String lastName;
}

@Data
public class Song {
    @Id
    private Long id;
    private String title;
    private Long authorId;
}
Run Code Online (Sandbox Code Playgroud)

这些实体由以下 DTO 表示:

@Data
public class SongDTO {
    private Long id;
    private String title;
    public static SongDTO from(Song s) {
        // converts Song to its dto
    }
}

@Data
public class PersonDTO { …
Run Code Online (Sandbox Code Playgroud)

java project-reactor spring-webflux

1
推荐指数
1
解决办法
4429
查看次数

Project Reactor Flux conCat、flux mergeSequential、flux mergeOrdered 之间有什么区别

如果我们提供相同的数据源,所有这些方法都会产生相同的结果。那么它们之间有什么区别呢?

java reactive-programming project-reactor reactive-streams spring-webflux

1
推荐指数
1
解决办法
1699
查看次数

如果创建热门发布者,flux cache()、replay() 和publish() 有什么区别?

cache()Flux和replay()ifpublish()创建热门发布者有什么区别?对于哪种用例,哪种运算符最适合?

以下示例使用 3 种不同的方法重播所有 5 个元素。

cache()

        var flux = Flux.fromStream(Stream.of(1,2,3,4,5))
                .delayElements(Duration.ofSeconds(1)).cache();

        flux.doOnNext(v -> System.out.println("First: " + v))
        .subscribe();

        Thread.sleep(5000);

        flux.doOnNext(v -> System.out.println("Second: " + v))
                .subscribe();

        Thread.sleep(10000);
Run Code Online (Sandbox Code Playgroud)

replay()

        var flux = Flux.fromStream(Stream.of(1,2,3,4,5))
                .delayElements(Duration.ofSeconds(1)).replay();

        flux.doOnNext(v -> System.out.println("First: " + v))
        .subscribe();

        Thread.sleep(5000);

        flux.doOnNext(v -> System.out.println("Second: " + v))
                .subscribe();

        flux.connect();

        Thread.sleep(10000);
Run Code Online (Sandbox Code Playgroud)

publish()

        var flux = Flux.fromStream(Stream.of(1,2,3,4,5))
                .delayElements(Duration.ofSeconds(1)).publish();

        flux.doOnNext(v -> System.out.println("First: " + v))
        .subscribe();

        Thread.sleep(5000);

        flux.doOnNext(v -> System.out.println("Second: " + v))
                .subscribe(); …
Run Code Online (Sandbox Code Playgroud)

project-reactor spring-webflux

1
推荐指数
1
解决办法
1799
查看次数

如何在 Spring 中有效地将排序后的通量分组为序列化组?

假设我有一个Flux包含许多(数十亿个字符串)的输入,如下所示:

  • 苹果
  • 应用
  • 圣经

这样的字符串有数十亿个,它们无法放入内存中,这就是我想使用反应式方法的原因。

流已排序。现在我想要的是通过前 3 个字符创建一系列有序字符串组:

  • 应用程序:苹果,应用程序
  • 围兜:圣经
  • 嘘:书

Flux最终会出现 HTTP 响应,这意味着所有“app”项目必须在“bib”项目开始之前输出。

如果不使用,Flux我可以使用有序属性并将项目收集到准备好的存储桶中(每个存储桶的字符串数量将适合内存) - 每当前缀发生变化时,我将刷新存储桶并开始收集新的前缀。有序流的一大优点是我知道一旦遇到新的前缀,旧的就不会再出现了。

但使用Flux我不知道如何做到这一点。将会.groupBy()返回FluxFlux但我认为在尝试将其序列化到 HTTP 响应输出流时这不会起作用。

spring flux project-reactor spring-webflux

1
推荐指数
1
解决办法
489
查看次数

如何返回包含 Reactive Mono 和 Flux 的 Reactive Flux?

我是反应式编程的新手,遇到了这个问题:

[
    {
        "customerDTO": {
            "scanAvailable": true
        },
        "bankAccountDTOs": {
            "scanAvailable": true,
            "prefetch": -1
        }
    }
]
Run Code Online (Sandbox Code Playgroud)

数据传输对象:

public class ResponseClientDTO {
    private Mono<CustomerDTO> customerDTO;
    private Flux<BankAccountDTO> bankAccountDTOs;
}
Run Code Online (Sandbox Code Playgroud)

服务:

public Flux<ResponseClientDTO> getCustomerWithBankAccounts(String customerId){
    Flux<BankAccountDTO> bankAccounts = webClient
        .get()
        .uri(uriBuilder -> 
                uriBuilder.path("customers")
                .queryParam("customerId", customerId).build())
        .accept(MediaType.APPLICATION_JSON)
        .retrieve()
        .bodyToFlux(BankAccountDTO.class);
        
    
    Mono<CustomerDTO> cMono = findOne(customerId);
    
    ResponseClientDTO responseClientDTO = new ResponseClientDTO();
    responseClientDTO.setBankAccountDTOs(bankAccounts);
    responseClientDTO.setCustomerDTO(cMono);
    
    return Flux.just(responseClientDTO);
}
Run Code Online (Sandbox Code Playgroud)

我从另一个 API 查询端点,它返回一个Flux<BankAccounts>. 我想找到客户所有的银行账户。

java spring spring-webflux spring-webclient

1
推荐指数
1
解决办法
1850
查看次数

如何将delayElements()与Flux合并一起使用?

我正在学习教程,我相信我的代码与讲师的代码相同,但我不明白为什么delayElements()不起作用。

这是调用者方法:

public static void main(String[] args) {
    FluxAndMonoGeneratorService fluxAndMonoGeneratorService = new FluxAndMonoGeneratorService();
    fluxAndMonoGeneratorService.explore_merge()
            .doOnComplete(() -> System.out.println("Completed !"))
            .onErrorReturn("asdasd")
            .subscribe(System.out::println);
}
Run Code Online (Sandbox Code Playgroud)

如果我将没有延迟元素的方法编写为:

public Flux<String> explore_merge() {

        Flux<String> abcFlux = Flux.just("A", "B", "C");
        Flux<String> defFlux = Flux.just("D", "E", "F");

        return Flux.merge(abcFlux, defFlux);
    }
Run Code Online (Sandbox Code Playgroud)

然后控制台中的输出是(如预期):

00:53:19.443 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
A
B
C
D
E
F
Completed !

BUILD SUCCESSFUL in 1s
Run Code Online (Sandbox Code Playgroud)

但我想使用delayElements()来测试merge()方法:

public Flux<String> explore_merge() {

        Flux<String> abcFlux = Flux.just("A", "B", …
Run Code Online (Sandbox Code Playgroud)

java project-reactor spring-webflux

1
推荐指数
1
解决办法
1992
查看次数

Spring webflux 与 jwt + csrf 令牌

我需要对我的后端实施 CSRF 保护。我正在使用以下配置。但应用程序允许没有 CSRF 令牌的 Post 和 Get 请求。

@Slf4j
@EnableWebFluxSecurity
@EnableReactiveMethodSecurity
public class SecurityConfig {

    @Bean
    SecurityWebFilterChain springSecurityFilterChain(ServerHttpSecurity http) {
        http
                .csrf(csrf -> csrf.csrfTokenRepository(CookieServerCsrfTokenRepository.withHttpOnlyFalse()))
                .authorizeExchange()
                .anyExchange().authenticated()
                .and().oauth2ResourceServer().jwt();
        return http.build();
    }
}
Run Code Online (Sandbox Code Playgroud)

在 HTTP 请求中包含实际的 CSRF 令牌

 @ControllerAdvice
    public class SecurityControllerAdvice {
        @ModelAttribute
        Mono<CsrfToken> csrfToken(ServerWebExchange exchange) {
            Mono<CsrfToken> csrfToken = exchange.getAttribute(CsrfToken.class.getName());
            return csrfToken.doOnSuccess(token -> {
                exchange.getAttributes()
                        .put(CsrfRequestDataValueProcessor.DEFAULT_CSRF_ATTR_NAME, token);
            });
        }
    }
Run Code Online (Sandbox Code Playgroud)

我尝试使用邮递员的API。但这对我不起作用。

春季版

 <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.3</version>
        <relativePath/>
   </parent> 
Run Code Online (Sandbox Code Playgroud)

依赖项:

<dependency>
            <groupId>org.springframework.security</groupId>
            <artifactId>spring-security-oauth2-resource-server</artifactId>
            <version>RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.security</groupId>
            <artifactId>spring-security-oauth2-jose</artifactId>
            <version>RELEASE</version>
        </dependency> …
Run Code Online (Sandbox Code Playgroud)

csrf spring-security vue.js spring-webflux

1
推荐指数
1
解决办法
1033
查看次数