标签: spring-kafka

ReactiveKafkaProducerTemplate 发送方法中的 Mock SenderResult

我正在尝试模拟reactiveKafkaConsumerTemplate 的发送方法。

 @Mock
private ReactiveKafkaConsumerTemplate<String, String> reactiveKafkaConsumerTemplate;
@Mock
private ReactiveKafkaProducerTemplate<String, List<Object>> reactiveKafkaProducerTemplate;

 Mockito.when(reactiveKafkaConsumerTemplate.receiveAutoAck())
        .thenReturn(createConsumerRecords(2));



Mockito.when(reactiveKafkaProducerTemplate
.send(Mockito.anyString(),Mockito.anyString(),Mockito.anyList()))
                    .thenReturn(???);
Run Code Online (Sandbox Code Playgroud)

我试图模拟reactiveProducerTemplate 的send 方法以返回SenderResult。可以这样做吗?如果是,有人可以指出我的文档/示例来执行此操作。我花了很多时间寻找解决方案,但找不到任何解决方案。

更新:我根据加里的建议尝试了以下操作

ProducerRecord<String, List<Object>> record 
= new  ProducerRecord<String, List<Object>>(topic,"key", objectSetup.setup());
RecordMetadata meta 
= new RecordMetadata(new TopicPartition("topic",0),0,0,0,(long)1,2,1);
 
Mockito.when(reactiveKafkaProducerTemplate.send(topic,"key",objectSetup.setup())
.thenReturn(Mono.just(new SendResult<>(record, meta))));
Run Code Online (Sandbox Code Playgroud)

我在 .thenReturn(Mono.just(new SendResult<>(record, meta)))) 行收到以下异常。它没有提到异常中的 null 值,而且我没有看到任何 null 值。

java.lang.NullPointerException
    at com.ServiceTests.cTestMethod(ServiceTests.java:69)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
    at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
    at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
    at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) …
Run Code Online (Sandbox Code Playgroud)

apache-kafka project-reactor spring-kafka reactor-kafka

1
推荐指数
1
解决办法
1923
查看次数

动态调整消费者线程数量以适应 Kafka 分区数量

我有一个 Kafka 主题,有 50 个分区。
我的 Spring Boot 应用程序使用 Spring Kafka 来读取这些消息@KafkaListener

我的应用程序在 Kubernetes 中自动缩放的实例数量。

默认情况下,Spring Kafka 似乎为每个主题启动 1 个消费者线程。

org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1
Run Code Online (Sandbox Code Playgroud)

因此,对于应用程序的唯一实例,一个线程正在读取 50 个分区。
有 2 个实例,有负载平衡,每个实例监听 25 个分区。每个实例仍然有 1 个线程。

concurrency我知道我可以使用上的参数设置线程数@KafkaListener
但这是一个固定值。

有什么方法可以告诉 Spring 动态调整消费者线程的数量以适应客户端当前正在侦听的分区数量吗?

spring apache-kafka spring-kafka

1
推荐指数
1
解决办法
2018
查看次数

SpringBoot 与 REACTOR kafka :增加 2CPUs pod 上的消息消耗吞吐量

关于带有 Reactor kafka 的 SpringBoot 3 应用程序的小问题。

我有一个小型反应式 kafka 消费者应用程序,它消耗来自 kafka 的消息并处理该消息。

该应用程序正在使用一个the-topic具有三个分区的主题。

该应用程序是docker化的,并且由于资源消耗限制的原因,该应用程序只能使用2个CPU(请耐心等待)。为了让事情变得更加困难,我只能运行该应用程序的一个唯一实例

该应用程序非常简单:

     <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <dependency>
            <groupId>io.projectreactor.kafka</groupId>
            <artifactId>reactor-kafka</artifactId>
        </dependency>
    </dependencies>
Run Code Online (Sandbox Code Playgroud)
@Configuration
public class MyKafkaConfiguration {

    @Bean
    public KafkaReceiver<String, String> reactiveKafkaConsumerTemplate(KafkaProperties kafkaProperties) {
        kafkaProperties.setBootstrapServers(List.of("my-kafka.com:9092"));
        kafkaProperties.getConsumer().setGroupId("should-i-do-something-here");
        final ReceiverOptions<String, String> basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
        basicReceiverOptions.subscription(Collections.singletonList("the-topic"));
        return new DefaultKafkaReceiver<>(ConsumerFactory.INSTANCE, basicReceiverOptions);
    }

}
Run Code Online (Sandbox Code Playgroud)
@Service
public class MyConsumer implements CommandLineRunner {

    @Autowired
    private KafkaReceiver<String, String> kafkaReceiver;


    @Override
    public void run(String... args) {
        myConsumer().subscribe();
    }

    public Flux<String> …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka spring-boot spring-kafka reactor-kafka

1
推荐指数
1
解决办法
2159
查看次数

如何将自定义标头值添加到 spring kafka 消息?

我是 springboot kafka 的新手,我在这篇文章之后创建了一个示例。

https://www.codenotfound.com/spring-kafka-boot-example.html

我目前使用的是 spring.kafka.version 1.1.6

我想在消息中添加自定义标题,以便我可以在标题中发送某些属性,例如:fileName、fileId

我发现您可以将 kafka 元数据设置为标题,但这不符合我的目的。

无论如何我可以做到这一点吗?如果可能的话,我很感激你能分享一个例子。

java apache-kafka spring-boot spring-kafka

0
推荐指数
1
解决办法
2997
查看次数

如何使用工厂为特定主题配置 Spring Kafka Listener?

我希望能够通过属性读取主题,而无需在 Kafka 侦听器注释上指定任何内容。不使用 Spring Boot。

我尝试通过“主题”键直接从属性对象读取主题。这给出了一个错误:IllegalStateException:topics, topicPattern, or topicPartitions must be provided.

// some class
@KafkaListener
public void listener(List<String> messages) {
  System.out.print(messages);
}

//some other class
@Bean
public ConsumerFactory<String, String> consumerFactory(Properties topicProp) {
  return new DefaultKafkaConsumerFactory(topicProp);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
  ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();

  Properties prop = new Properties();
  prop.setProperty("topics", "my-custom-topic");

  factory.setConsumerFactory(this.consumerFactory(prop));
  return factory;
}

Is this possible?
Run Code Online (Sandbox Code Playgroud)

java spring apache-kafka kafka-consumer-api spring-kafka

0
推荐指数
1
解决办法
2876
查看次数

将 spring-kafka 更新到 2.4.0.RELEASE 时出现“java.lang.ClassNotFoundException: org.springframework.core.log.LogAccessor”

我们使用 spring boot 2.1.7.RELEASE 和 spring-kafka 2.2.7.RELEASE。现在,我们计划将 spring-kafka 版本更新到 2.4.0.RELEASE,并在应用程序启动时出现以下错误

java.lang.ClassNotFoundException: org.springframework.core.log.LogAccessor

你能建议我在这里缺少什么吗?

spring-kafka

0
推荐指数
1
解决办法
8467
查看次数

Kafka Consumer 中更好的错误处理方式

我有一个使用 spring-kafka 配置的 Springboot 应用程序,我想在其中处理收听主题时可能发生的各种错误。如果由于反序列化或任何其他异常而丢失/无法使用任何消息,则会重试 2 次,之后该消息应记录到错误文件中。我有两种可以遵循的方法:-

第一种方法(使用 SeekToCurrentErrorHandler 和 DeadLetterPublishingRecoverer):-

@Autowired
KafkaTemplate<String,Object> template;

@Bean(name = "kafkaSourceProvider")
public ConcurrentKafkaListenerContainerFactory<K, V> consumerFactory() {
        Map<String, Object> config = appProperties.getSource()
                .getProperties();
        ConcurrentKafkaListenerContainerFactory<K, V> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(config));

        DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
                (r, e) -> {
                    if (e instanceof FooException) {
                        return new TopicPartition(r.topic() + ".DLT", r.partition());
                    }
                });
        ErrorHandler errorHandler = new SeekToCurrentErrorHandler(recoverer, new FixedBackOff(0L, 2L));

        factory.setErrorHandler(errorHandler);
        return factory;
    }
Run Code Online (Sandbox Code Playgroud)

但为此我们需要添加主题(一个新的 .DLT 主题),然后我们可以将其记录到文件中。

@Bean
    public KafkaAdmin admin() {
        Map<String, Object> …
Run Code Online (Sandbox Code Playgroud)

java spring-boot kafka-consumer-api spring-kafka

0
推荐指数
1
解决办法
2万
查看次数

Spring kafka - 未能构建 kafka 消费者

我在使用 gradle 项目启动并运行我的 Kafka/confluent spring boot 时遇到了问题。我最初在这个测试项目中只有一个制作人,一切都运行良好。然后我添加了一个 Kafka 消费者,现在我在启动时遇到了异常。任何人都可以在这里发现问题:

首先这是堆栈跟踪

2021-01-22 19:56:08.566  WARN 61123 --- [           main] ConfigServletWebServerApplicationContext : Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
2021-01-22 19:56:08.573  INFO 61123 --- [           main] j.LocalContainerEntityManagerFactoryBean : Closing JPA EntityManagerFactory for persistence unit 'default'
2021-01-22 19:56:08.575  INFO 61123 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'applicationTaskExecutor'
2021-01-22 19:56:08.576  INFO 61123 --- [           main] com.zaxxer.hikari.HikariDataSource …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka spring-boot spring-kafka

0
推荐指数
1
解决办法
1198
查看次数

如何获取Spring Boot默认的Kafka消费者对象

我对 Kafka 消费者的配置位于 application.yaml 文件中。我知道如果我只有 1 个消费者,我不需要创建 ConsumerFactory bean,它会默认由 spring 设置。我需要测试我的消费者,因此我需要访问测试文件中的消费者对象,并且我不想再次配置它(我想使用 application.yaml 中的默认配置)文件来创建消费者对象。这怎么可能?

java spring-boot-actuator spring-kafka spring-boot-test

0
推荐指数
1
解决办法
2854
查看次数

Spring Cloud Stream + Kafka启用.auto.commit

我有一个项目,我已升级到最新版本的 Spring Boot 和 Spring Cloud,并注意到一些意外的行为。

spring-boot:2.5.2 spring-cloud 2020.0.3

另外值得注意的是,spring-kaka由于问题https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1079 ,我已降级

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>2.6.7</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)

我已打开日志记录:

<logger name="org.apache.kafka.clients.consumer.ConsumerConfig" level="info"/>
Run Code Online (Sandbox Code Playgroud)

我的 spring yaml 如下:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      auto-offset-reset: latest
      **enable-auto-commit: true**
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: myapp.serde.MyCustomDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: myapp.serde.MyCustomSerializer
    properties:
      security:
        protocol: PLAINTEXT
  cloud:
    stream:
      default:
        producer:
          useNativeEncoding: true
        consumer:
          useNativeEncoding: true
      bindings:
        myInboundRoute:
          destination: some-destination.1
          group: a-custom-group
        myOutboundRoute:
          destination: some-destination.2
Run Code Online (Sandbox Code Playgroud)

当我启动应用程序时,我看到以下输出:

[2021-07-09 21:41:42,310] [main] INFO  org.apache.kafka.clients.consumer.ConsumerConfig: ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = …
Run Code Online (Sandbox Code Playgroud)

spring-cloud-stream spring-kafka spring-cloud-stream-binder-kafka

0
推荐指数
1
解决办法
4558
查看次数

CommonErrorHandler 不存在于 kafka spring 中?

我创建了一个简单的卡夫卡消费者

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> 
  kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}
}
Run Code Online (Sandbox Code Playgroud)

这是卡夫卡消费者

@Component
public class KafkaConsumer {

@KafkaListener(topics = "NewTopic", groupId = "group_id")
public void consume(String message) {
    System.out.println("message = " + message);
}
}
Run Code Online (Sandbox Code Playgroud)

当我运行应用程序时出现以下错误

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration': Unexpected exception …
Run Code Online (Sandbox Code Playgroud)

spring spring-mvc apache-kafka spring-boot spring-kafka

0
推荐指数
1
解决办法
9597
查看次数

Apache Kafka 和 GCP PubSub 之间有什么区别?

Apache Kafka 和 GCP PubSub 之间有什么区别?何时使用kafka,何时使用pubsub。

publish-subscribe apache-kafka google-cloud-pubsub spring-kafka

-2
推荐指数
1
解决办法
4082
查看次数