我正在尝试在我的 Spring Boot 应用程序中实现反应式 kafka 消费者,我正在查看这些示例:https : //github.com/reactor/reactor-kafka/blob/master/reactor-kafka-samples/src/main /java/reactor/kafka/samples/SampleScenarios.java
看起来反应式 kafka 中尚不支持 Spring
我了解 kafka 侦听器如何在 Spring 中的非反应式 kafka API 中工作:最简单的解决方案是为 ConcurrentKafkaListenerContainerFactory 和 ConsumerFactory 配置 bean,然后使用 @KafkaListener 注释和瞧
但是我现在不确定如何在 Spring 中正确使用反应式 kafka。
基本上我需要一个话题的听众。我应该创建某种循环或我自己的调度程序吗?或者也许我错过了一些东西。任何人都可以分享他们的知识和最佳实践吗?
java kafka-consumer-api reactive-kafka spring-kafka reactor-kafka
我正在使用反应堆卡夫卡发送卡夫卡消息并接收和处理它们。在接收 kakfa 有效负载时,我会进行一些反序列化,如果出现异常,我只想记录该有效负载(通过保存到 mongo ),然后继续接收其他有效负载。
为此,我使用以下方法 -
@EventListener(ApplicationStartedEvent.class)
public void kafkaReceiving() {
for(Flux<ReceiverRecord<String, Object>> flux: kafkaService.getFluxReceives()) {
flux.delayUntil(//some function to do something)
.doOnNext(r -> r.receiverOffset().acknowledge())
.onErrorResume(this::handleException()) // here I'll just save to mongo
.subscribe();
}
}
private Publisher<? extends ReceiverRecord<String,Object>> handleException(object ex) {
// save to mongo
return Flux.empty();
}
Run Code Online (Sandbox Code Playgroud)
在这里,我希望每当我在接收有效负载时遇到异常时, onErrorResume 应该捕获它并记录到 mongo ,然后当我发送到 kafka 队列时我应该很好地继续接收更多消息。但是,我看到异常发生后,即使调用了 onErrorResume 方法,但我无法再处理发送到 Kakfa 主题的消息。我在这里可能缺少什么吗?
Reactor 错误处理文档 ( https://projectreactor.io/docs/core/3.4.10/reference/index.html#error.handling ) 指出错误处理运算符不会让原始序列继续。
在了解错误处理运算符之前,您必须记住,反应序列中的任何错误都是终止事件。即使使用错误处理运算符,它也不会让原始序列继续。相反,它将 onError 信号转换为新序列(后备序列)的开始。换句话说,它替换了其上游终止的序列。
但 onErrorContinue 的 javadoc 声明如下(https://projectreactor.io/docs/core/3.4.10/api/index.html) -
通过从序列中删除有罪的元素并继续处理后续元素,让上游兼容运算符从错误中恢复。
onErrorContinue 不被视为“错误处理运算符”吗?
它似乎确实允许原始序列继续 -
Flux.range(1, 5)
.map(i -> {
if (i == 3) {
throw new RuntimeException("Forcing exception for " + i);
}
return i;
})
.doOnNext(i -> System.out.println(i))
.onErrorContinue((throwable, o) -> System.err.println("Error while processing " + o + " - " + throwable.getMessage()))
.subscribe();
Run Code Online (Sandbox Code Playgroud)
结果(删除了 3 个但继续后续元素)
1
2
4
5
Error while processing 3 - Forcing exception for 3
Process …Run Code Online (Sandbox Code Playgroud) 我正在尝试模拟reactiveKafkaConsumerTemplate 的发送方法。
@Mock
private ReactiveKafkaConsumerTemplate<String, String> reactiveKafkaConsumerTemplate;
@Mock
private ReactiveKafkaProducerTemplate<String, List<Object>> reactiveKafkaProducerTemplate;
Mockito.when(reactiveKafkaConsumerTemplate.receiveAutoAck())
.thenReturn(createConsumerRecords(2));
Mockito.when(reactiveKafkaProducerTemplate
.send(Mockito.anyString(),Mockito.anyString(),Mockito.anyList()))
.thenReturn(???);
Run Code Online (Sandbox Code Playgroud)
我试图模拟reactiveProducerTemplate 的send 方法以返回SenderResult。可以这样做吗?如果是,有人可以指出我的文档/示例来执行此操作。我花了很多时间寻找解决方案,但找不到任何解决方案。
更新:我根据加里的建议尝试了以下操作
ProducerRecord<String, List<Object>> record
= new ProducerRecord<String, List<Object>>(topic,"key", objectSetup.setup());
RecordMetadata meta
= new RecordMetadata(new TopicPartition("topic",0),0,0,0,(long)1,2,1);
Mockito.when(reactiveKafkaProducerTemplate.send(topic,"key",objectSetup.setup())
.thenReturn(Mono.just(new SendResult<>(record, meta))));
Run Code Online (Sandbox Code Playgroud)
我在 .thenReturn(Mono.just(new SendResult<>(record, meta)))) 行收到以下异常。它没有提到异常中的 null 值,而且我没有看到任何 null 值。
java.lang.NullPointerException
at com.ServiceTests.cTestMethod(ServiceTests.java:69)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) …Run Code Online (Sandbox Code Playgroud) 关于带有 Reactor kafka 的 SpringBoot 3 应用程序的小问题。
我有一个小型反应式 kafka 消费者应用程序,它消耗来自 kafka 的消息并处理该消息。
该应用程序正在使用一个the-topic具有三个分区的主题。
该应用程序是docker化的,并且由于资源消耗限制的原因,该应用程序只能使用2个CPU(请耐心等待)。为了让事情变得更加困难,我只能运行该应用程序的一个唯一实例。
该应用程序非常简单:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
</dependency>
</dependencies>
Run Code Online (Sandbox Code Playgroud)
@Configuration
public class MyKafkaConfiguration {
@Bean
public KafkaReceiver<String, String> reactiveKafkaConsumerTemplate(KafkaProperties kafkaProperties) {
kafkaProperties.setBootstrapServers(List.of("my-kafka.com:9092"));
kafkaProperties.getConsumer().setGroupId("should-i-do-something-here");
final ReceiverOptions<String, String> basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
basicReceiverOptions.subscription(Collections.singletonList("the-topic"));
return new DefaultKafkaReceiver<>(ConsumerFactory.INSTANCE, basicReceiverOptions);
}
}
Run Code Online (Sandbox Code Playgroud)
@Service
public class MyConsumer implements CommandLineRunner {
@Autowired
private KafkaReceiver<String, String> kafkaReceiver;
@Override
public void run(String... args) {
myConsumer().subscribe();
}
public Flux<String> …Run Code Online (Sandbox Code Playgroud)