我正在尝试模拟reactiveKafkaConsumerTemplate 的发送方法。
@Mock
private ReactiveKafkaConsumerTemplate<String, String> reactiveKafkaConsumerTemplate;
@Mock
private ReactiveKafkaProducerTemplate<String, List<Object>> reactiveKafkaProducerTemplate;
Mockito.when(reactiveKafkaConsumerTemplate.receiveAutoAck())
.thenReturn(createConsumerRecords(2));
Mockito.when(reactiveKafkaProducerTemplate
.send(Mockito.anyString(),Mockito.anyString(),Mockito.anyList()))
.thenReturn(???);
Run Code Online (Sandbox Code Playgroud)
我试图模拟reactiveProducerTemplate 的send 方法以返回SenderResult。可以这样做吗?如果是,有人可以指出我的文档/示例来执行此操作。我花了很多时间寻找解决方案,但找不到任何解决方案。
更新:我根据加里的建议尝试了以下操作
ProducerRecord<String, List<Object>> record
= new ProducerRecord<String, List<Object>>(topic,"key", objectSetup.setup());
RecordMetadata meta
= new RecordMetadata(new TopicPartition("topic",0),0,0,0,(long)1,2,1);
Mockito.when(reactiveKafkaProducerTemplate.send(topic,"key",objectSetup.setup())
.thenReturn(Mono.just(new SendResult<>(record, meta))));
Run Code Online (Sandbox Code Playgroud)
我在 .thenReturn(Mono.just(new SendResult<>(record, meta)))) 行收到以下异常。它没有提到异常中的 null 值,而且我没有看到任何 null 值。
java.lang.NullPointerException
at com.ServiceTests.cTestMethod(ServiceTests.java:69)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) …Run Code Online (Sandbox Code Playgroud) 我有一个 Kafka 主题,有 50 个分区。
我的 Spring Boot 应用程序使用 Spring Kafka 来读取这些消息@KafkaListener
我的应用程序在 Kubernetes 中自动缩放的实例数量。
默认情况下,Spring Kafka 似乎为每个主题启动 1 个消费者线程。
org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1
Run Code Online (Sandbox Code Playgroud)
因此,对于应用程序的唯一实例,一个线程正在读取 50 个分区。
有 2 个实例,有负载平衡,每个实例监听 25 个分区。每个实例仍然有 1 个线程。
concurrency我知道我可以使用上的参数设置线程数@KafkaListener。
但这是一个固定值。
有什么方法可以告诉 Spring 动态调整消费者线程的数量以适应客户端当前正在侦听的分区数量吗?
关于带有 Reactor kafka 的 SpringBoot 3 应用程序的小问题。
我有一个小型反应式 kafka 消费者应用程序,它消耗来自 kafka 的消息并处理该消息。
该应用程序正在使用一个the-topic具有三个分区的主题。
该应用程序是docker化的,并且由于资源消耗限制的原因,该应用程序只能使用2个CPU(请耐心等待)。为了让事情变得更加困难,我只能运行该应用程序的一个唯一实例。
该应用程序非常简单:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
</dependency>
</dependencies>
Run Code Online (Sandbox Code Playgroud)
@Configuration
public class MyKafkaConfiguration {
@Bean
public KafkaReceiver<String, String> reactiveKafkaConsumerTemplate(KafkaProperties kafkaProperties) {
kafkaProperties.setBootstrapServers(List.of("my-kafka.com:9092"));
kafkaProperties.getConsumer().setGroupId("should-i-do-something-here");
final ReceiverOptions<String, String> basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
basicReceiverOptions.subscription(Collections.singletonList("the-topic"));
return new DefaultKafkaReceiver<>(ConsumerFactory.INSTANCE, basicReceiverOptions);
}
}
Run Code Online (Sandbox Code Playgroud)
@Service
public class MyConsumer implements CommandLineRunner {
@Autowired
private KafkaReceiver<String, String> kafkaReceiver;
@Override
public void run(String... args) {
myConsumer().subscribe();
}
public Flux<String> …Run Code Online (Sandbox Code Playgroud) 我是 springboot kafka 的新手,我在这篇文章之后创建了一个示例。
https://www.codenotfound.com/spring-kafka-boot-example.html
我目前使用的是 spring.kafka.version 1.1.6
我想在消息中添加自定义标题,以便我可以在标题中发送某些属性,例如:fileName、fileId
我发现您可以将 kafka 元数据设置为标题,但这不符合我的目的。
无论如何我可以做到这一点吗?如果可能的话,我很感激你能分享一个例子。
我希望能够通过属性读取主题,而无需在 Kafka 侦听器注释上指定任何内容。不使用 Spring Boot。
我尝试通过“主题”键直接从属性对象读取主题。这给出了一个错误:IllegalStateException:topics, topicPattern, or topicPartitions must be provided.
// some class
@KafkaListener
public void listener(List<String> messages) {
System.out.print(messages);
}
//some other class
@Bean
public ConsumerFactory<String, String> consumerFactory(Properties topicProp) {
return new DefaultKafkaConsumerFactory(topicProp);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
Properties prop = new Properties();
prop.setProperty("topics", "my-custom-topic");
factory.setConsumerFactory(this.consumerFactory(prop));
return factory;
}
Is this possible?
Run Code Online (Sandbox Code Playgroud) 我们使用 spring boot 2.1.7.RELEASE 和 spring-kafka 2.2.7.RELEASE。现在,我们计划将 spring-kafka 版本更新到 2.4.0.RELEASE,并在应用程序启动时出现以下错误
java.lang.ClassNotFoundException: org.springframework.core.log.LogAccessor
你能建议我在这里缺少什么吗?
我有一个使用 spring-kafka 配置的 Springboot 应用程序,我想在其中处理收听主题时可能发生的各种错误。如果由于反序列化或任何其他异常而丢失/无法使用任何消息,则会重试 2 次,之后该消息应记录到错误文件中。我有两种可以遵循的方法:-
第一种方法(使用 SeekToCurrentErrorHandler 和 DeadLetterPublishingRecoverer):-
@Autowired
KafkaTemplate<String,Object> template;
@Bean(name = "kafkaSourceProvider")
public ConcurrentKafkaListenerContainerFactory<K, V> consumerFactory() {
Map<String, Object> config = appProperties.getSource()
.getProperties();
ConcurrentKafkaListenerContainerFactory<K, V> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(config));
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
(r, e) -> {
if (e instanceof FooException) {
return new TopicPartition(r.topic() + ".DLT", r.partition());
}
});
ErrorHandler errorHandler = new SeekToCurrentErrorHandler(recoverer, new FixedBackOff(0L, 2L));
factory.setErrorHandler(errorHandler);
return factory;
}
Run Code Online (Sandbox Code Playgroud)
但为此我们需要添加主题(一个新的 .DLT 主题),然后我们可以将其记录到文件中。
@Bean
public KafkaAdmin admin() {
Map<String, Object> …Run Code Online (Sandbox Code Playgroud) 我在使用 gradle 项目启动并运行我的 Kafka/confluent spring boot 时遇到了问题。我最初在这个测试项目中只有一个制作人,一切都运行良好。然后我添加了一个 Kafka 消费者,现在我在启动时遇到了异常。任何人都可以在这里发现问题:
首先这是堆栈跟踪
2021-01-22 19:56:08.566 WARN 61123 --- [ main] ConfigServletWebServerApplicationContext : Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
2021-01-22 19:56:08.573 INFO 61123 --- [ main] j.LocalContainerEntityManagerFactoryBean : Closing JPA EntityManagerFactory for persistence unit 'default'
2021-01-22 19:56:08.575 INFO 61123 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'applicationTaskExecutor'
2021-01-22 19:56:08.576 INFO 61123 --- [ main] com.zaxxer.hikari.HikariDataSource …Run Code Online (Sandbox Code Playgroud) 我对 Kafka 消费者的配置位于 application.yaml 文件中。我知道如果我只有 1 个消费者,我不需要创建 ConsumerFactory bean,它会默认由 spring 设置。我需要测试我的消费者,因此我需要访问测试文件中的消费者对象,并且我不想再次配置它(我想使用 application.yaml 中的默认配置)文件来创建消费者对象。这怎么可能?
我有一个项目,我已升级到最新版本的 Spring Boot 和 Spring Cloud,并注意到一些意外的行为。
spring-boot:2.5.2 spring-cloud 2020.0.3
另外值得注意的是,spring-kaka由于问题https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1079 ,我已降级
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.6.7</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)
我已打开日志记录:
<logger name="org.apache.kafka.clients.consumer.ConsumerConfig" level="info"/>
Run Code Online (Sandbox Code Playgroud)
我的 spring yaml 如下:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
auto-offset-reset: latest
**enable-auto-commit: true**
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: myapp.serde.MyCustomDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: myapp.serde.MyCustomSerializer
properties:
security:
protocol: PLAINTEXT
cloud:
stream:
default:
producer:
useNativeEncoding: true
consumer:
useNativeEncoding: true
bindings:
myInboundRoute:
destination: some-destination.1
group: a-custom-group
myOutboundRoute:
destination: some-destination.2
Run Code Online (Sandbox Code Playgroud)
当我启动应用程序时,我看到以下输出:
[2021-07-09 21:41:42,310] [main] INFO org.apache.kafka.clients.consumer.ConsumerConfig: ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = …Run Code Online (Sandbox Code Playgroud) spring-cloud-stream spring-kafka spring-cloud-stream-binder-kafka
我创建了一个简单的卡夫卡消费者
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
Run Code Online (Sandbox Code Playgroud)
这是卡夫卡消费者
@Component
public class KafkaConsumer {
@KafkaListener(topics = "NewTopic", groupId = "group_id")
public void consume(String message) {
System.out.println("message = " + message);
}
}
Run Code Online (Sandbox Code Playgroud)
当我运行应用程序时出现以下错误
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration': Unexpected exception …Run Code Online (Sandbox Code Playgroud) Apache Kafka 和 GCP PubSub 之间有什么区别?何时使用kafka,何时使用pubsub。
publish-subscribe apache-kafka google-cloud-pubsub spring-kafka
spring-kafka ×12
apache-kafka ×8
java ×6
spring-boot ×5
spring ×3
spring-cloud-stream-binder-kafka ×1
spring-mvc ×1