小编Arn*_*lle的帖子

Spring boot 2.1.5、WebFlux、Reactor:如何正确处理MDC

Spring Boot 2.1.5 项目反应堆 3.2.9

我正在使用上述框架设置一堆其余的响应式 API,并且遇到了 MDC(映射诊断上下文)的恼人问题。我的应用程序是在 JAVA 中。

MDC 依赖线程局部变量来存储当前查询的映射上下文以放入日志。显然,该系统并不完美,并且与响应式模式相矛盾,因为执行的不同步骤将通过不同的线程执行。

我在 Play Reactive 框架中遇到了同样的问题,但通过将映射上下文从一个演员透明地复制到另一个演员,找到了一种解决方法。

对于 spring 和 reactor,我还没有找到令人满意的解决方案。

在互联网上找到的一些随机示例:

首先 - 它有效,但迫使你使用一堆实用方法

一样

第二 - 它试图在 onNext 发布者事件期间复制上下文,但似乎在这样做的过程中丢失了一些功能。例如,信号上下文丢失了。

我需要一个适当的解决方案来处理这个问题:

  • 一个可以在 MDC 和 reactor 之间建立链接的库?
  • 一种调整反应器/弹簧以透明地实现它的方法?
  • 有什么建议吗?

spring-boot project-reactor spring-webflux

8
推荐指数
1
解决办法
2749
查看次数

在 Condition 或 ConfigurationCondition 中使用上下文 bean

我希望使用 org.springframework.context.annotation.Condition 接口在启动时禁用/启用我的一些 @Configuration 类。下面的例子:

public class Condition implements ConfigurationCondition {

    @Override
    public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
        //Gets the configuration manager from the context
        RapidConfDocConfiguration.DocConf docConf = context.getBeanFactory().getBean(DocConf.class);

        return docConf.isEnabled();
    }
}

@Configuration
public class ConfDocConfiguration {

    @Bean
    public DocConf docConf() {
        return new DocConf(true);
    }
}

@Configuration
@Import(ConfDocConfiguration.class)
@Conditional({Condition.class})
public class RapidSwaggerConfiguration {
    //Some configurations
}
Run Code Online (Sandbox Code Playgroud)

我这里遇到的问题是条件在实例化任何上下文之前执行,然后“DocConf”还不存在。如果我读取环境变量或类似的内容,则条件效果很好。

那么,是否可以基于上下文的 spring bean 使用这种条件?

spring spring-boot

7
推荐指数
1
解决办法
2628
查看次数

Spring webflux 错误处理程序:如何在错误处理程序中获取请求的反应器上下文?

Spring Boot 2.1.5 项目反应堆 3.2.9

在我的 webflux 项目中,我广泛使用 reactor 上下文来传递一些值。

我在这里的目的是能够获取异常处理程序内部的上下文。

一个简单的例子:

@Component
@Order(-2)
public class GlobalErrorWebExceptionHandler extends
    AbstractErrorWebExceptionHandler {

    public GlobalErrorWebExceptionHandler(ErrorAttributes errorAttributes, ResourceProperties resourceProperties, ApplicationContext applicationContext, ServerCodecConfigurer configurer) {
        super(errorAttributes, resourceProperties, applicationContext);
        this.setMessageWriters(configurer.getWriters());
    }

    @Override
    protected RouterFunction<ServerResponse> getRoutingFunction(
      ErrorAttributes errorAttributes) {
        return RouterFunctions
            .route(RequestPredicates.all(), request -> {
                Throwable error = errorAttributes.getError(request);

                return ServerResponse.status(500).syncBody(error.getMessage()).doOnEach(serverResponseSignal -> {
                    //Here the context is empty because i guess i created a new flux
                    System.out.println("What is in my context ? " + serverResponseSignal.getContext());
                    System.out.println("What is …
Run Code Online (Sandbox Code Playgroud)

project-reactor spring-webflux

7
推荐指数
1
解决办法
3062
查看次数

Kafka 流:内部状态存储的“TopicAuthorizationException:无权访问主题”

Java:OpenJdk 11 Kafka:2.2.0 Kafka 流库:2.3.0

我正在尝试在docker容器中部署我的 Kafka 流应用程序,但在尝试使用 TopicAuthorizationException 创建内部状态存储时失败。它在本地运行良好。本地和服务器之间的主要区别在于,它连接到部署了 Kafka 的服务器并使用通常的Kerberos身份验证进行身份验证。我无法理解身份验证和本地商店之间的联系。

我的流看起来像这样:

StreamsBuilder builder = new StreamsBuilder();

        //We stream from the source topic
        KStream<String, EnrichedMessagePayload> sourceMessagesStream = builder.stream(sourceTopic, Consumed
                .with(Serdes.serdeFrom(String.class), INPUT_SERDE));

        //We group per room and window
        TimeWindowedKStream<String, EnrichedMessagePayload> windowed = sourceMessagesStream
                .groupByKey().windowedBy(TimeWindows.of(Duration.ofMillis(windowSize)).grace(Duration.ZERO));

        //We make them a list
        KStream<Windowed<String>, WindowedMessages> grouped = windowed
                .aggregate(WindowedMessages::new,
                        (key, value, aggregate) -> aggregate.add(value),
                        Materialized.with(Serdes.String(), Serdes.serdeFrom(windowSerializer, windowSerializer)))
                .suppress(Suppressed.untilWindowCloses(unbounded()))
                .toStream();

        //Filter
        KStream<Windowed<String>, FilterResult> filtered = grouped
                .mapValues((readOnlyKey, value) -> …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-streams

6
推荐指数
1
解决办法
2万
查看次数

Gradle Kotlin DSL:子项目和插件的问题

摇篮6.1.1

我一直在尝试使用 Kotlin DSL 转换项目的 gradle 文件,但到目前为止失败了。我的所有项目都是用 Java 进行的多项目构建。我在这里遵循插件子项目示例

看起来像这样:

plugins {
    idea
    eclipse
}

subprojects {
    apply(plugin = "java")

    dependencies {
       implementation("com.google.guava:guava:28.1-jre")
       //...
    }
}
Run Code Online (Sandbox Code Playgroud)

java插件似乎在子项目中不被理解,并且所有“实现”行都得到了未解析的引用。

gradle gradle-kotlin-dsl

6
推荐指数
1
解决办法
4371
查看次数

Spring webflux 过滤器:如何在查询执行后获取反应器上下文?

Spring Boot 2.1.5 项目反应堆 3.2.9

在我的 webflux 项目中,我广泛使用反应器上下文来传递一些值。

我设置了一个过滤器,并试图记录上下文中的内容,并在出现错误/成功时记录不同的内容。

我检查了这个文档:https : //projectreactor.io/docs/core/release/reference/#context

我仍然在努力(尤其是在错误方面)来获得它。

基本上,我有这个过滤器:

@Component
public class MdcWebFilter implements WebFilter {

    @NotNull
    @Override
    public Mono<Void> filter(@NotNull ServerWebExchange serverWebExchange,
                             WebFilterChain webFilterChain) {

        Mono<Void> filter = webFilterChain.filter(serverWebExchange);

        return filter
            .doAfterSuccessOrError(new BiConsumer<Void, Throwable>() {
                @Override
                public void accept(Void aVoid, Throwable throwable) {
                    //Here i would like to be able to access to the request's context
                    System.out.println("doAfterSuccessOrError:" + (throwable==null ? "OK" : throwable.getMessage())+"log the context");
                }
            })
            .doOnEach(new Consumer<Signal<Void>>() {
                @Override
                public void …
Run Code Online (Sandbox Code Playgroud)

project-reactor spring-webflux

5
推荐指数
1
解决办法
6422
查看次数

如何使用 Junit5 为每个测试使用单独的类加载器

我一直在尝试设置一个 Junit 5 扩展来强制每个测试获得一个单独的类加载器。我可以在 Junit4 中轻松完成,创建我自己的 BlockJUnit4ClassRunner。但是,我现在无法让它工作。

目的是能够测试诸如静态块或不同状态下的记忆字段之类的东西。

到目前为止,我一直在尝试使用 TestInstanceFactory ,但没有成功:

public class SeparateClassLoaderExtension implements TestInstanceFactory {

    @SneakyThrows
    @Override
    public Object createTestInstance(TestInstanceFactoryContext factoryContext, ExtensionContext extensionContext) throws TestInstantiationException {
        ClassLoader testClassLoader = new TestClassLoader();
        final Class<?> testClass = Class.forName(factoryContext.getTestClass().getName(), true, testClassLoader);

        Constructor<?> defaultConstructor = testClass.getDeclaredConstructor();
        defaultConstructor.setAccessible(true);
        return defaultConstructor.newInstance();
    }
}
Run Code Online (Sandbox Code Playgroud)

我从 Junit 那里得到一个例外,说这个类的类型不正确。

有人有什么想法吗?

java junit5

5
推荐指数
1
解决办法
451
查看次数

用于集成测试的 docker 中的 Couchbase:使端口 8092、8093、8094 和 8095 可配置为能够使用 docker 的随机端口

我正在使用 Couchbase java 客户端 SDK 2.7.9 并且在尝试运行自动集成测试时遇到问题。在这样的测试中,我们通常使用随机端口来在同一个 Jenkins slave 上运行相同的东西(例如使用 docker)。

但是,对于客户端,我们可以指定许多自定义端口,但不能指定 8092、8093、8094 和 8095。

流行的 TestContainers 模块也提到这些端口必须在它们的 Couchbase 模块中保持静态:https ://www.testcontainers.org/modules/databases/couchbase/ 1

显然,也可以在服务器级别更改这些端口。

例子:

Docker-compose.yml

version: '3.0'
services:
  rapid_test_cb:
    build:
      context: ""
      dockerfile: cb.docker
    ports:
      - "8091"
      - "8092"
      - "8093"
      - "11210"
Run Code Online (Sandbox Code Playgroud)

docker 镜像是“couchbase:community-5.1.1”

内部端口是上面写的端口,但外部它们是随机的。在客户端级别,您可以设置 bootstrapHttpDirectPort 和 bootstrapCarrierDirectPort,但显然 8092 和 8093 端口是从服务器端获取的(谁不知道分配给他的是哪个端口)。

我想问您是否可以在客户端级别更改这些端口,如果不能,请认真考虑添加该功能。

couchbase docker couchbase-java-api

5
推荐指数
1
解决办法
329
查看次数

带有自定义组 ID 的 Gradle 插件

摇篮 6.1

我在使用来自自定义存储库的自定义插件的 Gradle 中使用新的插件配置模式时遇到了困难。

buildscript {
    repositories {
        maven {
            url = uri("https://custom")
        }
        mavenCentral()
        jcenter()
        maven {
            url = uri("https://plugins.gradle.org/m2/")
        }
    }
}

plugins {
    java
    idea
    id("com.custom.gradle.plugin.myplugin") version "1.1.0"
}
Run Code Online (Sandbox Code Playgroud)

我收到此错误:

Plugin [id: 'com.custom.gradle.plugin.myplugin', version: '1.1.0'] was not found in any of the following sources:

- Gradle Core Plugins (plugin is not in 'org.gradle' namespace)
- Plugin Repositories (could not resolve plugin artifact 'com.custom.gradle.plugin.myplugin:com.custom.gradle.plugin.myplugin:1.1.0')
  Searched in the following repositories:
    Gradle Central Plugin Repository
Run Code Online (Sandbox Code Playgroud)

Gradle 将使用插件 ID 作为其组 …

gradle gradle-plugin

5
推荐指数
1
解决办法
588
查看次数