小编Bri*_*zel的帖子

Flux未在Spring 5反应堆中订购

我可能错过了一些东西,但我无法弄清楚它是什么.

以下代码什么都不做:

webClient.get().uri("/some/path/here").retrieve()
     .bodyToMono(GetLocationsResponse.class)
     .doOnNext(System.out::println)
     .subscribe();
Run Code Online (Sandbox Code Playgroud)

如果我试图阻止呼叫它工作正常:

webClient.get().uri("/some/path/here").retrieve()
      .bodyToMono(GetLocationsResponse.class)
      .doOnNext(System.out::println)
      .block();
Run Code Online (Sandbox Code Playgroud)

奇怪的是,如果我"手动"创建一个Flux(即不是来自spring webClient),这很好用:

Flux.just("1", "2", "3")
    .filter(s -> !s.equals("2"))
    .doOnNext(System.out::println)
    .subscribe();
Run Code Online (Sandbox Code Playgroud)

有人可以解释一下我做错了什么吗?是不是.subscribe()应该在第一种情况下执行操作,就像它在最后一样?

谢谢!

java reactive-programming project-reactor

4
推荐指数
1
解决办法
1103
查看次数

当 Flux 为空时返回 404

当 Flux 为空时,我试图返回 404,类似于此处:WebFlux 功能:如何检测空 Flux 并返回 404?

我主要担心的是,当您检查通量是否包含元素时,它会发出该值,而您会丢失它。当我尝试在服务器响应上使用 switch if empty 时,它永远不会被调用(我偷偷认为这是因为 Mono 不是空的,只有主体是空的)。

我正在做的一些代码(我的路由器类上有一个过滤器,检查 DataNotFoundException 以返回 notFound):

Flux<Location> response = this.locationService.searchLocations(searchFields, pageToken);
return ok()
        .contentType(APPLICATION_STREAM_JSON)
        .body(response, Location.class)
        .switchIfEmpty(Mono.error(new DataNotFoundException("The data you seek is not here.")));
Run Code Online (Sandbox Code Playgroud)

^这从不调用 switchIfEmpty

Flux<Location> response = this.locationService.searchLocations(searchFields, pageToken);

return response.hasElements().flatMap(l ->{
   if(l){
       return ok()
               .contentType(APPLICATION_STREAM_JSON)
               .body(response, Location.class);
   } 
   else{
       return Mono.error(new DataNotFoundException("The data you seek is not here."));
   }
});
Run Code Online (Sandbox Code Playgroud)

^这会丢失 hasElements 上的发射元素。

有没有办法在 hasElements 中恢复发出的元素,或者让 switchIfEmpty 只检查主体的内容?

project-reactor spring-webflux

4
推荐指数
2
解决办法
4838
查看次数

从 Spring Boot 2.7.2 升级到 Spring Boot 3.0.0-SNAPSHOT:

代码

package com.example.BLModel;

import org.hibernate.annotations.Type;

import javax.persistence.Column;
import javax.persistence.EmbeddedId;
import javax.persistence.Entity;
import javax.persistence.FetchType;
import javax.persistence.JoinColumn;
import javax.persistence.ManyToOne;
import javax.persistence.MapsId;
import javax.persistence.Table;

@Entity
@Table(name = "email_template_custom")
public class EmailTemplateCustom {
    @EmbeddedId
    private EmailTemplateCustomId id;

    @MapsId("tenantId")
    @ManyToOne(fetch = FetchType.LAZY, optional = false)
    @JoinColumn(name = "tenant_id", nullable = false)
    private Tenant tenant;

    @Column(name = "email_subject", nullable = false, length = 1000)
    private String emailSubject;

    @Column(name = "email_content")
    @Type(type = "org.hibernate.type.TextType")
    private String emailContent;

    public EmailTemplateCustomId getId() {
        return id;
    }

    public void setId(EmailTemplateCustomId …
Run Code Online (Sandbox Code Playgroud)

spring hibernate spring-boot

4
推荐指数
1
解决办法
6280
查看次数

django-allauth,限制登录尝试失败的建议

我正在使用django-axes来限制对admin后端的登录尝试.但是,对于我的前端客户端通过django-allauth登录,我找不到任何机制来检测和防止登录失败.

使用allauth防止多次失败登录尝试的最佳方法是什么?有推荐的解决方案吗?我不太热衷于阻止来自特定IP的尝试,但是防止对单个用户名的多次尝试即"管理员"是好的.

提前致谢,

django django-authentication django-allauth

3
推荐指数
1
解决办法
852
查看次数

将 spring Websession 与 spring 反应式 Web 通量集成

这里是位于春装新款反应网络流量API的新实现会话的HTTP这里。我想将最新的 spring web session 集成到新的 spring 反应网络流量中。我似乎无法得到它,我尝试将它作为 bean 注入,但它不起作用。我想像往常一样注入 HttpSession

就像是

@Autowired
Websession webSession;
Run Code Online (Sandbox Code Playgroud)

spring spring-boot spring-webflux

3
推荐指数
1
解决办法
1425
查看次数

编译发现gradle的错误版本

我使用spring boot2。在一个多Java项目中。

我尝试构建我的主库(尚无Java文件)

apply plugin: 'java-library-distribution'


plugins {
    id 'org.springframework.boot' version '2.0.0.M7'
}

repositories {
    mavenCentral()
    maven { url "https://repo.spring.io/snapshot" }
    maven { url "https://repo.spring.io/milestone" }
}


dependencies {
    compile group: 'org.springframework.boot', name: 'spring-boot-starter-data-jpa', version: '2.0.0.M7'
    compile group: 'org.postgresql', name: 'postgresql', version: '42.1.4'
        testCompile group: 'org.springframework.boot', name: 'spring-boot-starter-test', version: '2.0.0.M7'
}

distributions {
    main{
        baseName = 'common-model'
    }
}

sourceCompatibility = 1.8

tasks.withType(JavaCompile) {
    options.compilerArgs = ["-Xlint:unchecked", "-Xlint:deprecation", "-parameters"]
}
Run Code Online (Sandbox Code Playgroud)

在我的gradle /包装器中,我有

distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-4.2-bin.zip
Run Code Online (Sandbox Code Playgroud)

我得到的错误

引起原因:org.gradle.api.internal.plugins.PluginApplicationException:无法应用插件[id'org.springframework.boot']

原因:org.gradle.api.GradleException:Spring …

gradle spring-boot

3
推荐指数
1
解决办法
6840
查看次数

Flux.concat 和 Flux.concatWith 之间的区别

我是反应流新手,正在学习使用 concat/concatWith 方法组合两个发布者(具体来说是 Flux)。

我可以用 concat 方法做的所有事情,都可以使用 concatWith 方法来实现。这是我使用的示例案例:

        Mono<String> mono1 = Mono.just(" karan ");
        Mono<String> mono2 = Mono.just(" | verma ");
        Mono<String> mono3 = Mono.just(" | kv ");

        Flux<String> flux1 = Flux.just(" {1} ","{2} ","{3} ","{4} " );
        Flux<String> flux2 = Flux.just(" |A|"," |B| "," |C| ");

        // FLux emits item each 500ms
        Flux<String> intervalFlux1 = Flux.interval(Duration.ofMillis(1000))
                                        .zipWith(flux1, (i, string) -> string);

        // FLux emits item each 700ms       
        Flux<String> intervalFlux2 = Flux
                                .interval(Duration.ofMillis(1000))
                                .zipWith(flux2, (i, string) -> string);



        System.out.println("**************Flux …
Run Code Online (Sandbox Code Playgroud)

reactive-programming project-reactor reactive-streams

3
推荐指数
1
解决办法
8510
查看次数

Spring WebFlux(反应器)。zipWith 时出错 - 由于缺少请求而无法发出滴答声

我有一个 Flux,对于每个对象,我应该对第三方 REST 进行 API 调用(大约 1000 次调用)。为了防止每秒出现许多请求,我使用:

    Flux<Calls> callsIntervalFlux=
            Flux.interval(Duration.ofMillis(100))
                    .zipWith(callsFlux, (i, call) -> call);

// and now Calls emits every 10ms, and REST API is not overloaded
Run Code Online (Sandbox Code Playgroud)

问题是,有时应用程序会因异常而失败:

reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$OverflowException: Could not emit tick 32 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)
Caused by: reactor.core.Exceptions$OverflowException: Could not emit tick 32 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)
Run Code Online (Sandbox Code Playgroud)

有没有我可以添加的逻辑来防止错误,或者只是跳过这个勾号?

project-reactor

3
推荐指数
1
解决办法
2113
查看次数

组合两个不同类型的 Flux 实例

使用 SpringBoot 2 和 Poi 类(兴趣点):

public class Poi {
public Poi(String poidId, Double price, Double latitude, Double longitude) {...}
private String poidId;
private Double latitude;
private Double longitude;
private Double price;
//And Getters and Setters
}
Run Code Online (Sandbox Code Playgroud)

我有 2 个 Poi 通量:

Flux<Poi> availablePoisFlux;
Flux<Poi> poiFlux;
Run Code Online (Sandbox Code Playgroud)

第一个元素availablePoisFlux包含 Pois:

  • 一个 poidId
  • 没有纬度信息
  • 没有经度信息
  • 价格信息

第二个元素poiFlux包含 Pois 和:

  • 一个 poidId
  • 纬度
  • 经度
  • 没有价格信息

(poidId 是 Poi 的标识符)。

我想从两个 Flux(poiFlux 和 availablePoisFlux)创建一个带有 Pois(带有 poidId、价格、经度和纬度)的新 Flux resultPoisFlux

poidId 属性是两个 …

reactive-programming project-reactor

3
推荐指数
1
解决办法
2479
查看次数

使用Spring WebClient解码内容编码gzip

我正在使用Spring WebClient(Spring 5.1.3)调用Web服务。服务以content-type: application/json和响应content-encoding: gzip

ClientResponse.bodyToMono 然后失败,出现错误“ JSON解码错误:非法字符((CTRL-CHAR,代码31))”,我认为这是因为在尝试解析JSON之前尚未对内容进行解码。

这是我如何创建WebClient的代码段(简化)

HttpClient httpClient = HttpClient.create().secure(sslContextSpec -> sslContextSpec.sslContext(sslContext));
return WebClient.builder().clientConnector(new ReactorClientHttpConnector(httpClient)).build();
Run Code Online (Sandbox Code Playgroud)

然后,我使用WebClient进行呼叫:

webClient.get().uri(uri)
    .accept(MediaType.APPLICATION_JSON)
    .header(HttpHeaders.ACCEPT_ENCODING, "gzip")
    .exchange()
Run Code Online (Sandbox Code Playgroud)

HTTP请求具有2个标头:

Accept: application/json
Accept-Encoding: gzip
Run Code Online (Sandbox Code Playgroud)

响应具有以下标头:

set-cookie: xxx
content-type: application/json; charset=utf-8
content-length: 1175
content-encoding: gzip
cache-control: no-store, no-cache
Run Code Online (Sandbox Code Playgroud)

通过执行以下操作,我能够手动解码GZIP内容并从结果中获取有效的JSON

webClient.get().uri(uri)
        .accept(MediaType.APPLICATION_JSON)
        .header("accept-encoding", "gzip")
        .exchange()
        .flatMap(encodedResponse -> encodedResponse.body((inputMessage, context) ->
                inputMessage.getBody().flatMap(dataBuffer -> {
                    ClientResponse.Builder decodedResponse = ClientResponse.from(encodedResponse);
                    try {
                        GZIPInputStream gz = new GZIPInputStream(dataBuffer.asInputStream());
                        decodedResponse.body(new String(gz.readAllBytes()));
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    decodedResponse.headers(headers …
Run Code Online (Sandbox Code Playgroud)

spring reactor-netty spring-webflux

3
推荐指数
1
解决办法
1329
查看次数