标签: reactor-kafka

在 Spring Boot 应用程序中实现 Reactive Kafka Listener

我正在尝试在我的 Spring Boot 应用程序中实现反应式 kafka 消费者,我正在查看这些示例:https : //github.com/reactor/reactor-kafka/blob/master/reactor-kafka-samples/src/main /java/reactor/kafka/samples/SampleScenarios.java

看起来反应式 kafka 中尚不支持 Spring

我了解 kafka 侦听器如何在 Spring 中的非反应式 kafka API 中工作:最简单的解决方案是为 ConcurrentKafkaListenerContainerFactory 和 ConsumerFactory 配置 bean,然后使用 @KafkaListener 注释和瞧

但是我现在不确定如何在 Spring 中正确使用反应式 kafka。

基本上我需要一个话题的听众。我应该创建某种循环或我自己的调度程序吗?或者也许我错过了一些东西。任何人都可以分享他们的知识和最佳实践吗?

java kafka-consumer-api reactive-kafka spring-kafka reactor-kafka

7
推荐指数
1
解决办法
2036
查看次数

使用 onErrorResume 处理使用 Reactor Kafka 发布到 Kafka 的有问题的有效负载

我正在使用反应堆卡夫卡发送卡夫卡消息并接收和处理它们。在接收 kakfa 有效负载时,我会进行一些反序列化,如果出现异常,我只想记录该有效负载(通过保存到 mongo ),然后继续接收其他有效负载。

为此,我使用以下方法 -

@EventListener(ApplicationStartedEvent.class)
public void kafkaReceiving() {
   for(Flux<ReceiverRecord<String, Object>> flux: kafkaService.getFluxReceives()) {
       flux.delayUntil(//some function to do something)
       .doOnNext(r -> r.receiverOffset().acknowledge())
       .onErrorResume(this::handleException()) // here I'll just save to mongo 
       .subscribe();
   }
}


private Publisher<? extends ReceiverRecord<String,Object>> handleException(object ex) {
 // save to mongo
 return Flux.empty();
}
Run Code Online (Sandbox Code Playgroud)

在这里,我希望每当我在接收有效负载时遇到异常时, onErrorResume 应该捕获它并记录到 mongo ,然后当我发送到 kafka 队列时我应该很好地继续接收更多消息。但是,我看到异常发生后,即使调用了 onErrorResume 方法,但我无法再处理发送到 Kakfa 主题的消息。我在这里可能缺少什么吗?

apache-kafka spring-boot project-reactor reactor-kafka

6
推荐指数
1
解决办法
1955
查看次数

Reactor onErrorContinue 运算符是否让原始序列继续?

Reactor 错误处理文档 ( https://projectreactor.io/docs/core/3.4.10/reference/index.html#error.handling ) 指出错误处理运算符不会让原始序列继续。

在了解错误处理运算符之前,您必须记住,反应序列中的任何错误都是终止事件。即使使用错误处理运算符,它也不会让原始序列继续。相反,它将 onError 信号转换为新序列(后备序列)的开始。换句话说,它替换了其上游终止的序列。

但 onErrorContinue 的 javadoc 声明如下(https://projectreactor.io/docs/core/3.4.10/api/index.html) -

通过从序列中删除有罪的元素并继续处理后续元素,让上游兼容运算符从错误中恢复。

onErrorContinue 不被视为“错误处理运算符”吗?

它似乎确实允许原始序列继续 -

        Flux.range(1, 5)
                .map(i -> {
                    if (i == 3) {
                        throw new RuntimeException("Forcing exception for " + i);
                    }
                    return i;
                })
                .doOnNext(i -> System.out.println(i))
                .onErrorContinue((throwable, o) -> System.err.println("Error while processing " + o + " - " + throwable.getMessage()))
                .subscribe();
Run Code Online (Sandbox Code Playgroud)

结果(删除了 3 个但继续后续元素)

1
2
4
5
Error while processing 3 - Forcing exception for 3

Process …
Run Code Online (Sandbox Code Playgroud)

java project-reactor reactor-kafka

5
推荐指数
1
解决办法
2472
查看次数

ReactiveKafkaProducerTemplate 发送方法中的 Mock SenderResult

我正在尝试模拟reactiveKafkaConsumerTemplate 的发送方法。

 @Mock
private ReactiveKafkaConsumerTemplate<String, String> reactiveKafkaConsumerTemplate;
@Mock
private ReactiveKafkaProducerTemplate<String, List<Object>> reactiveKafkaProducerTemplate;

 Mockito.when(reactiveKafkaConsumerTemplate.receiveAutoAck())
        .thenReturn(createConsumerRecords(2));



Mockito.when(reactiveKafkaProducerTemplate
.send(Mockito.anyString(),Mockito.anyString(),Mockito.anyList()))
                    .thenReturn(???);
Run Code Online (Sandbox Code Playgroud)

我试图模拟reactiveProducerTemplate 的send 方法以返回SenderResult。可以这样做吗?如果是,有人可以指出我的文档/示例来执行此操作。我花了很多时间寻找解决方案,但找不到任何解决方案。

更新:我根据加里的建议尝试了以下操作

ProducerRecord<String, List<Object>> record 
= new  ProducerRecord<String, List<Object>>(topic,"key", objectSetup.setup());
RecordMetadata meta 
= new RecordMetadata(new TopicPartition("topic",0),0,0,0,(long)1,2,1);
 
Mockito.when(reactiveKafkaProducerTemplate.send(topic,"key",objectSetup.setup())
.thenReturn(Mono.just(new SendResult<>(record, meta))));
Run Code Online (Sandbox Code Playgroud)

我在 .thenReturn(Mono.just(new SendResult<>(record, meta)))) 行收到以下异常。它没有提到异常中的 null 值,而且我没有看到任何 null 值。

java.lang.NullPointerException
    at com.ServiceTests.cTestMethod(ServiceTests.java:69)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
    at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
    at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
    at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) …
Run Code Online (Sandbox Code Playgroud)

apache-kafka project-reactor spring-kafka reactor-kafka

1
推荐指数
1
解决办法
1923
查看次数

SpringBoot 与 REACTOR kafka :增加 2CPUs pod 上的消息消耗吞吐量

关于带有 Reactor kafka 的 SpringBoot 3 应用程序的小问题。

我有一个小型反应式 kafka 消费者应用程序,它消耗来自 kafka 的消息并处理该消息。

该应用程序正在使用一个the-topic具有三个分区的主题。

该应用程序是docker化的,并且由于资源消耗限制的原因,该应用程序只能使用2个CPU(请耐心等待)。为了让事情变得更加困难,我只能运行该应用程序的一个唯一实例

该应用程序非常简单:

     <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <dependency>
            <groupId>io.projectreactor.kafka</groupId>
            <artifactId>reactor-kafka</artifactId>
        </dependency>
    </dependencies>
Run Code Online (Sandbox Code Playgroud)
@Configuration
public class MyKafkaConfiguration {

    @Bean
    public KafkaReceiver<String, String> reactiveKafkaConsumerTemplate(KafkaProperties kafkaProperties) {
        kafkaProperties.setBootstrapServers(List.of("my-kafka.com:9092"));
        kafkaProperties.getConsumer().setGroupId("should-i-do-something-here");
        final ReceiverOptions<String, String> basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
        basicReceiverOptions.subscription(Collections.singletonList("the-topic"));
        return new DefaultKafkaReceiver<>(ConsumerFactory.INSTANCE, basicReceiverOptions);
    }

}
Run Code Online (Sandbox Code Playgroud)
@Service
public class MyConsumer implements CommandLineRunner {

    @Autowired
    private KafkaReceiver<String, String> kafkaReceiver;


    @Override
    public void run(String... args) {
        myConsumer().subscribe();
    }

    public Flux<String> …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka spring-boot spring-kafka reactor-kafka

1
推荐指数
1
解决办法
2159
查看次数