小编ath*_*hom的帖子

Mono 超时时的“运算符调用默认 onErrorDropped”

在我的生产代码中,当 Mono 超时时,我的日志中出现错误。
我已设法使用以下代码重新创建这些错误:

@Test
public void testScheduler() {
    Mono<String> callableMethod1 = callableMethod();
    callableMethod1.block();

    Mono<String> callableMethod2 = callableMethod();
    callableMethod2.block();
}

private Mono<String> callableMethod() {
    return Mono.fromCallable(() -> {
        Thread.sleep(60);
        return "Success";
    })
            .subscribeOn(Schedulers.elastic())
            .timeout(Duration.ofMillis(50))
            .onErrorResume(throwable -> Mono.just("Timeout"));
}
Run Code Online (Sandbox Code Playgroud)

Mono.fromCallable我正在使用第三方库进行阻塞调用。当此调用超时时,我收到类似于

reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.publisher.Operators - Scheduler worker in group main failed with an uncaught exception
Run Code Online (Sandbox Code Playgroud)

这些错误似乎也是间歇性的,有时当我运行代码时,我根本没有得到任何错误。但是,当我以 10 的循环重复调用时,我始终得到它们。

project-reactor

19
推荐指数
1
解决办法
8363
查看次数

Spring Reactor:Mono.zip在空Mono上失败

我正在使用Spring Reactor 3.1.0.M3并且有一个用例,我需要从多个来源合并Mono.我发现如果其中一个Monos是一个空Mono,zip会失败而不会出错.

例:

Mono<String> m1 = Mono.just("A");
Mono<String> m2 = Mono.just("B");
Mono<String> m3 = Mono.empty();

Mono<String> combined = Mono.zip(strings -> {
    StringBuffer sb = new StringBuffer();
    for (Object string : strings) {
        sb.append((String) string);
    }
    return sb.toString();
}, m1, m2, m3);
System.out.println("Combined " + combined.block());
Run Code Online (Sandbox Code Playgroud)

添加m3时,响应中的组合子被跳过为空.当我删除m3时,它都按预期工作,并返回"AB".有没有办法通过检测空Mono来处理这个问题?另外,有没有办法让组合器方法知道对象的类型而不必抛出?

java spring project-reactor

5
推荐指数
2
解决办法
6267
查看次数

WebFlux 应用程序中的 WebFilter

我有一个使用 Spring Boot 2.0.0.M5/2.0.0.BUILD-SNAPSHOT 的 Spring Boot WebFlux 应用程序。我需要将跟踪 ID 添加到所有日志中。

为了让它在 WebFlux 应用程序中工作,我尝试使用此处此处描述的 WebFilter 方法

@Component
public class TraceIdFilter implements WebFilter {

@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
    return chain.filter(exchange).subscriberContext((Context context) ->
        context.put(AuditContext.class, getAuditContext(exchange.getRequest().getHeaders()))
    );
}
Run Code Online (Sandbox Code Playgroud)

我的控制器

@GetMapping(value = "/some_mapping")
public Mono<ResponseEntity<WrappedResponse>> getResource(@PathVariable("resourceId") String id) {
    Mono.subscriberContext().flatMap(context -> {
        AuditContext auditContext = context.get(AuditContext.class);
        ...
    });
Run Code Online (Sandbox Code Playgroud)

我遇到的问题是过滤器方法永远不会被执行,并且上下文也没有设置。我已经确认 Webfilter 在启动时已加载。还需要其他什么才能使过滤器正常工作吗?

spring spring-boot spring-webflux

5
推荐指数
2
解决办法
1万
查看次数

运行 Sonar 后无法删除构建目录

我使用 SonarQube Gradle 插件版本 3.0 和 Gradle 6.7.1 来分析我的 Java 代码。这工作正常,结果上传到声纳。当我gradlew clean test sonarqube最初这样做时,我没有问题。但是当我随后运行它时,我得到以下信息:

java.io.IOException: Unable to delete directory 'C:\xxx\build'

Failed to delete some children. This might happen because a process has
files open or has its working directory set in the target directory.

- C:\xxx\build\sonar\findbugs\findsecbugs-plugin.jar
- C:\xxx\build\sonar\findbugs
- C:\xxx\build\sonar
Run Code Online (Sandbox Code Playgroud)

这会一直失败,直到我杀死 Gradle 守护进程,然后我才能再次成功运行 sonarqube。有办法解决这个问题吗?

findbugs sonarqube

5
推荐指数
0
解决办法
293
查看次数

使用Spring WebClient解码内容编码gzip

我正在使用Spring WebClient(Spring 5.1.3)调用Web服务。服务以content-type: application/json和响应content-encoding: gzip

ClientResponse.bodyToMono 然后失败,出现错误“ JSON解码错误:非法字符((CTRL-CHAR,代码31))”,我认为这是因为在尝试解析JSON之前尚未对内容进行解码。

这是我如何创建WebClient的代码段(简化)

HttpClient httpClient = HttpClient.create().secure(sslContextSpec -> sslContextSpec.sslContext(sslContext));
return WebClient.builder().clientConnector(new ReactorClientHttpConnector(httpClient)).build();
Run Code Online (Sandbox Code Playgroud)

然后,我使用WebClient进行呼叫:

webClient.get().uri(uri)
    .accept(MediaType.APPLICATION_JSON)
    .header(HttpHeaders.ACCEPT_ENCODING, "gzip")
    .exchange()
Run Code Online (Sandbox Code Playgroud)

HTTP请求具有2个标头:

Accept: application/json
Accept-Encoding: gzip
Run Code Online (Sandbox Code Playgroud)

响应具有以下标头:

set-cookie: xxx
content-type: application/json; charset=utf-8
content-length: 1175
content-encoding: gzip
cache-control: no-store, no-cache
Run Code Online (Sandbox Code Playgroud)

通过执行以下操作,我能够手动解码GZIP内容并从结果中获取有效的JSON

webClient.get().uri(uri)
        .accept(MediaType.APPLICATION_JSON)
        .header("accept-encoding", "gzip")
        .exchange()
        .flatMap(encodedResponse -> encodedResponse.body((inputMessage, context) ->
                inputMessage.getBody().flatMap(dataBuffer -> {
                    ClientResponse.Builder decodedResponse = ClientResponse.from(encodedResponse);
                    try {
                        GZIPInputStream gz = new GZIPInputStream(dataBuffer.asInputStream());
                        decodedResponse.body(new String(gz.readAllBytes()));
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    decodedResponse.headers(headers …
Run Code Online (Sandbox Code Playgroud)

spring reactor-netty spring-webflux

3
推荐指数
1
解决办法
1329
查看次数

自定义 MessageConverter 在 Spring Boot 中不起作用

我想MessageConverter在 Spring Boot 中注册一个自定义,让我更好地控制消息的转换方式。我有一个 Spring Boot 2.1.7 Webflux 应用程序。

我创建了我的自定义转换器:

public class CustomConverter extends MappingJackson2HttpMessageConverter {
    ...
}
Run Code Online (Sandbox Code Playgroud)

并将其注册为:

@Bean
public MappingJackson2HttpMessageConverter mappingJackson2HttpMessageConverter(ObjectMapper objectMapper) {
    return new CustomConverter();
}
Run Code Online (Sandbox Code Playgroud)

我还确认它是通过自动装配注册的,HttpMessageConverters并确认它在转换器列表中。然后,我实现了一些方法,我期待着这样调用canReadcanWritereadInternalwriteInternal他们并把断点。我的应用程序公开了一个 Rest 端点,该端点以 JSON 响应,并调用一个外部 Rest 服务,该服务发布 JSON 有效负载并接收 JSON 作为回报。

我在任何时候都看不到我的转换器正在被使用。我什至删除了所有其他转换器,只留下我自己的转换器,但仍然没有被调用。

我错过了什么吗?

spring-boot spring-webflux

1
推荐指数
1
解决办法
1552
查看次数