标签: spring-kafka

Spring-kafka KafkaListener 在运行时自动启动行为

我正在使用 spring-Kafka 作为消费者应用程序运行一个 spring 应用程序。在 Kafka 监听器中,我们使用 SpEL 表达式设置了自动启动。我们在运行时更改此值以使此表达式为 false。当我们更改属性时,KafkaListener 将停止消费更多消息。

问题:这是停止轮询进一步记录的正确方法还是我们应该使用 kafkaListenerEndpointRegistry 并停止服务?这两者有什么区别?

另一个问题是,如果我有另一个应用程序使用同一消费者组,所有分区都会重新分配给该应用程序吗?

apache-kafka spring-boot spring-kafka

2
推荐指数
1
解决办法
4642
查看次数

为什么 kafka 生产者在第一条消息上非常慢?

我正在使用卡夫卡生产者将价格发送到主题。当我发送第一条消息时,它会打印生产者配置,然后发送消息,因此发送第一条消息需要更多时间。

在收到第一条消息后,几乎只用了 1/2 毫秒就发送了一条消息。

我的问题是我们可以做一些事情以便跳过配置部分或者我们可以在发送第一条消息之前开始?

我正在我的项目中使用 spring kafka。我还阅读了其他问题,但并没有太大帮助。

应用程序.yml

server:
  port: 8081
spring:
    kafka:
      bootstrap-servers:   ***.***.*.***:9092
      producer:
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
      
Run Code Online (Sandbox Code Playgroud)

生产者价值观:

acks = 1
batch.size = 16384
bootstrap.servers = [192.168.1.190:9092]
buffer.memory = 33554432
client.dns.lookup = default
client.id = 
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level …
Run Code Online (Sandbox Code Playgroud)

java spring apache-kafka spring-boot spring-kafka

2
推荐指数
1
解决办法
7119
查看次数

当所有代理都关闭时 Spring-Kafka Producer 重试

我正在使用 Springboot 2.3.5.RELEASE 和 spring-kafka 2.6.3。我正在尝试做一个简单的 Kafka Producer Retry POC,这应该会导致生产者在代理关闭时或者在消息发送到代理之前抛出异常时重试。
以下生产者配置适用于启用重试的幂等生产者。

// Producer configuration
@Bean
public Map<String, Object> producerConfigs() {
   Map<String, Object> props = new HashMap<>();
   props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
   props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
   props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
   props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "3");
   props.put(ProducerConfig.RETRIES_CONFIG, "10");
   props.put(ProducerConfig.ACKS_CONFIG, "all");
}
@Bean
public ProducerFactory<String, Object> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}
... other configs
Run Code Online (Sandbox Code Playgroud)

我知道只有在代理向生产者发送 ack 时出现暂时性错误时,Kafka 生产者重试才会起作用。因此,如果代理在发送消息之前就已关闭,则上述重试不起作用。因此我尝试用 spring-kafka 引入@Retry,不幸的是我也无法让它工作。有谁知道如何解决这个问题,因为这似乎是一个常见的用例,比如如果经纪人宕机或者出现网络故障,我不希望我的制作人停止工作

@Transaction
@Retryable(value = Exception.class, maxAttemptsExpression = "10",
        backoff = @Backoff(delayExpression = "1000")) …
Run Code Online (Sandbox Code Playgroud)

spring-boot spring-kafka

2
推荐指数
1
解决办法
5468
查看次数

使用 Spring Boot 时如何禁用 KafkaAdmin

目前我正在使用 Spring boot 2.4.0 和 spring-kafka。

我想使用消费者和生产者,但不想使用 kafka admin。

我尝试KafkaAdmin通过将“bootstrap.servers”设置为 null 来覆盖类,但不起作用。它仍然“尝试”连接以进行 KafkaAdmin 并记录错误。这不是致命的,但是有什么方法可以完全禁用kafka admin吗?

spring-kafka

2
推荐指数
1
解决办法
2709
查看次数

Kafka Streams:使用 Spring Cloud Stream 为每组主题定义多个 Kafka Streams

我正在尝试使用 Kafka Streams 做一个简单的 POC。但是,我在启动应用程序时遇到异常。我正在使用 Spring-Kafka、Kafka-Streams 2.5.1 和 Spring boot 2.3.5 Kafka 流配置

@Configuration
public class KafkaStreamsConfig {
    private static final Logger log = LoggerFactory.getLogger(KafkaStreamsConfig.class);

    @Bean
    public Function<KStream<String, String>, KStream<String, String>> processAAA() {
        return input -> input.peek((key, value) -> log
                .info("AAA Cloud Stream Kafka Stream processing : {}", input.toString().length()));
    }

    @Bean
    public Function<KStream<String, String>, KStream<String, String>> processBBB() {
        return input -> input.peek((key, value) -> log
                .info("BBB Cloud Stream Kafka Stream processing : {}", input.toString().length()));
    }

    @Bean
    public Function<KStream<String, String>, KStream<String, …
Run Code Online (Sandbox Code Playgroud)

apache-kafka spring-cloud-stream apache-kafka-streams spring-kafka spring-cloud-stream-binder-kafka

2
推荐指数
1
解决办法
4425
查看次数

如何在 Gradle/Spring Boot/Kotlin 应用程序中为单元、集成和端到端测试创建不同的文件夹?

我有一个使用 Spring Boot、Gradle 和 Kotlin 构建的应用程序,它与 Kafka 和 PostgreSQL 连接。我正在使用它resources/application.yml作为配置文件。\n我想为每种测试创建一个特定的包:单元、集成和端到端,但现在我有一个仅用于单元测试的包。做这个的最好方式是什么?我正在使用 Junit5。

\n

我想要这样的东西:

\n
application/\n\xe2\x94\x9c\xe2\x94\x80 src/\n\xe2\x94\x82  \xe2\x94\x9c\xe2\x94\x80 e2e/\n\xe2\x94\x82  \xe2\x94\x9c\xe2\x94\x80 integration/\n\xe2\x94\x82  \xe2\x94\x9c\xe2\x94\x80 main/\n\xe2\x94\x82  \xe2\x94\x9c\xe2\x94\x80 test/\nbuild.gradle.kts\ngradlew\ngradlew.properties\nsettings.gradle.kts\n
Run Code Online (Sandbox Code Playgroud)\n

另外,我希望在运行单元测试时我的应用程序不要连接到 Kafka 或 Postgres,但在运行集成测试时我希望它这样做。我怎样才能做到这一点?

\n

我尝试了 Kafka 的配置,但没有成功:

\n
application/\n\xe2\x94\x9c\xe2\x94\x80 src/\n\xe2\x94\x82  \xe2\x94\x9c\xe2\x94\x80 e2e/\n\xe2\x94\x82  \xe2\x94\x9c\xe2\x94\x80 integration/\n\xe2\x94\x82  \xe2\x94\x9c\xe2\x94\x80 main/\n\xe2\x94\x82  \xe2\x94\x9c\xe2\x94\x80 test/\nbuild.gradle.kts\ngradlew\ngradlew.properties\nsettings.gradle.kts\n
Run Code Online (Sandbox Code Playgroud)\n

我还创建了一个注释来模拟所有 Kafka 消费者和生产者(这有效,但我不想在每个测试类中添加此注释)

\n
spring\n    autoconfigure:\n        exclude: org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration\n
Run Code Online (Sandbox Code Playgroud)\n

我的消费者看起来像这样:

\n
@MockBeans(MockBean(MyConsumer::class), MockBean(MyProducer::class))\nannotation class MockKafka\n\n@SpringBootTest\n@MockKafka\nclass MyAwesomeTest { // tests here }\n
Run Code Online (Sandbox Code Playgroud)\n

我的 application.yaml 文件如下所示:

\n
@Component\nclass MyConsumer() {\n    private val logger = …
Run Code Online (Sandbox Code Playgroud)

gradle kotlin spring-boot junit5 spring-kafka

2
推荐指数
1
解决办法
1202
查看次数

Kafka Consumer 在 SSL 上抛出“OutOfMemoryError:Java 堆空间”错误

我在 Spring Boot 项目中使用Spring-Kafka 2.7.1 。

当我将其连接到配置了 SSL 的 Kafka Broker 时,它会给出如下所示的“OutofMemory”错误,即使我多次增加堆大小但无济于事。

日志如下

java.lang.OutOfMemoryError: Java heap space\
    at java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:61) ~[na:na]\
    at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:348) ~[na:na]\
    at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30) ~[kafka-clients-2.7.1.jar!/:na]\
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:113) ~[kafka-clients-2.7.1.jar!/:na]\
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:447) ~[kafka-clients-2.7.1.jar!/:na]\
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:397) ~[kafka-clients-2.7.1.jar!/:na]\
    at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:674) ~[kafka-clients-2.7.1.jar!/:na]\
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:576) ~[kafka-clients-2.7.1.jar!/:na]\
    at org.apache.kafka.common.network.Selector.poll(Selector.java:481) ~[kafka-clients-2.7.1.jar!/:na]\
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:563) ~[kafka-clients-2.7.1.jar!/:na]\
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265) ~[kafka-clients-2.7.1.jar!/:na]\
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[kafka-clients-2.7.1.jar!/:na]\
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) ~[kafka-clients-2.7.1.jar!/:na]\
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:245) ~[kafka-clients-2.7.1.jar!/:na]\
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:480) ~[kafka-clients-2.7.1.jar!/:na]\
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1257) ~[kafka-clients-2.7.1.jar!/:na]\
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1226) ~[kafka-clients-2.7.1.jar!/:na]\
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) ~[kafka-clients-2.7.1.jar!/:na]\
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1414) ~[spring-kafka-2.7.7.jar!/:2.7.7]\
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1251) ~[spring-kafka-2.7.7.jar!/:2.7.7]\
    at …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka spring-boot spring-kafka

2
推荐指数
1
解决办法
5211
查看次数

如何从数据库传递@KafkaListener中的groupId值?

我想将我的 spring mvc 应用程序与 Kafka 服务器连接以使用 kafka 消息。为此,我编写了 KafkaConsumer 类,如下所示。

@Service
public class KafkaConsumer {
    
    
    @KafkaListener(groupId = "my-group-id", topicPattern = "VID.*", containerFactory = SystemParameterConstants.KAFKA_LISTENER_CONTAINER_FACTORY)
    public void receivedMessage(@Payload String message) {
        logger.info("================ receivedMessage() ==================");
        logger.info("::: Message recieved from kafka ::: {}", message);
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            ...
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

这里我有硬编码的组ID “my-group-id”,但我想从数据库读取这个groupId,以便我可以为不同的环境拥有不同的groupId。

请提出一个解决方案。谢谢!

apache-kafka spring-kafka

2
推荐指数
1
解决办法
1576
查看次数

Spring Kafka 模板 - 在 Spring Boot 启动时连接到 Kafka 主题

我已经实现了一个使用 Spring Kafka 的基本 Spring Boot 应用程序。我希望我的制作人在第一个主题被调用之前连接到 Kafka 主题.send(),但我找不到方法来做到这一点。那可能吗?

日志显示 KafkaTemplate 仅在我触发以下.send方法后连接到 Kafka 主题16:12:44

2021-11-24 16:12:12.602  INFO 63930 --- [           main] c.e.k.KafkaProducerExampleApplication    : The following profiles are active: dev
2021-11-24 16:12:13.551  INFO 63930 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port(s): 8080 (http)
2021-11-24 16:12:13.559  INFO 63930 --- [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
2021-11-24 16:12:13.559  INFO 63930 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet engine: [Apache Tomcat/9.0.53]
2021-11-24 16:12:13.613  INFO 63930 …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka spring-kafka

2
推荐指数
1
解决办法
2203
查看次数

spring总是需要KafkaTemplate吗?

问题:springboot总是需要创建KafkaTemplate类型的bean吗?下面的详细信息/堆栈跟踪/代码库,请告诉我我做错了什么。谢谢

  1. 我一直在 Spring Boot 项目的某个主题中发布消息
  2. 为了创建回调机制,我使用了 org.apache.kafka.clients. Producer.KafkaProducer.send(ProducerRecord<K, V>, Callback) 来发送消息并创建回调
  3. 我这样做的原因是因为使用 KafkaTemplate 时的可听未来仅提供失败异常(并且我想在所有用例中将回调注册为单独的可重用类)
  4. 但是,当我没有定义 KafkaTemplate 类型的 bean 并出现以下错误时,spring 无法启动
    引起:org.springframework.beans.factory.UnsatisfiedDependencyException:创建在类路径资源[org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.class]中定义的名称为“kafkaTemplate”的bean时出错:通过方法“kafkaTemplate”表达的依赖关系不满足参数0;嵌套异常是 org.springframework.beans.factory.NoSuchBeanDefinitionException:没有“org.springframework.kafka.core.ProducerFactory”类型的合格 bean 可用:预计至少有 1 个有资格作为自动装配候选者的 bean。依赖注释:{}
        在 org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:800) ~[spring-beans-5.3.12.jar:5.3.12]
        在 org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:541) ~[spring-beans-5.3.12.jar:5.3.12]
        在 org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1352) ~[spring-beans-5.3.12.jar:5.3.12]
        在 org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1195) ~[spring-beans-5.3.12.jar:5.3.12]
        在 org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:582) ~[spring-beans-5.3.12.jar:5.3.12]
        在 org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:542) ~[spring-beans-5.3.12.jar:5.3.12]
        在 org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:335) ~[spring-beans-5.3.12.jar:5.3.12]
        在 org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234) ~[spring-beans-5.3.12.jar:5.3.12]
        在 org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:333) ~[spring-beans-5.3.12.jar:5.3.12]
        在 org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:208) ~[spring-beans-5.3.12.jar:5.3.12]
        在 org.springframework.beans.factory.config.DependencyDescriptor.resolveCandidate(DependencyDescriptor.java:276) ~[spring-beans-5.3.12.jar:5.3.12]
        在 org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1380) ~[spring-beans-5.3.12.jar:5.3.12]
        在 org.springframework.beans.factory.support.DefaultListableBeanFactory$DependencyObjectProvider.getIfUnique(DefaultListableBeanFactory.java:2063) ~[spring-beans-5.3.12.jar:5.3.12]
        在org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration。(KafkaAnnotationDrivenConfiguration.java:90)〜[spring-boot-autoconfigure-2.4.12.jar:2.4.12]
        在 java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(本机方法) ~[na:na]
        在 java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:na]
        在 java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:na]
        在 java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490) ~[na:na]
        在 …

apache-kafka spring-boot spring-kafka

2
推荐指数
1
解决办法
6741
查看次数