标签: spring-kafka

如何让 Java 中的 Kafka 通配符消费者更快地了解新主题?

我的项目通过为每个租户和通配符消费者使用单独的主题来在 Kafka 级别实现多租户,即发布到主题“message.tenant1”或“message.tenant99”并从主题“message.*”消费

这工作正常,直到我们想要动态添加新租户,即添加“message.tenant100”主题。在重新启动之前,通配符使用者不会看到新主题。

有没有办法让通配符消费者在不重启整个应用程序的情况下看到新主题?

我们正在使用 Spring,但如果无法通过 Spring 获得解决方案,那么我们可以使用其他方法。

编辑:事实证明这确实有效,但在重新平衡之前有一个粗略的延迟 5 分钟。5 分钟对于我们的生产来说可能太长了。我尝试将 'leader.imbalance.check.interval.seconds' 设置为较低的值,但这似乎没有任何效果。

我如何配置或告诉 Kafka 尽快重新平衡?我希望重新平衡是一项昂贵的操作,而不是您想要经常做的操作。

apache-kafka kafka-consumer-api spring-kafka

1
推荐指数
1
解决办法
1158
查看次数

Spring Cloud Stream Kafka - 找不到 Serde 类:org.apache.kafka.common.serialization.Serde$StringSerde

我正在尝试使用 Spring Cloud Stream 框架构建一个简单的 Kafka Streams 应用程序。我可以连接到流来推送原始数据进行处理。但是当我尝试按键处理事件计数的流Serde class not found: org.apache.kafka.common.serialization.Serde$StringSerde时,我在运行应用程序时遇到异常。我检查了我的项目包含的库,我可以找到这个Serde类,它没有丢失。我不知道为什么在运行时它没有被加载!

下面是我的源文件。

com.pgp.learn.kafka.analytics.AnalyticsApplication

package com.pgp.learn.kafka.analytics;

import com.pgp.learn.kafka.analytics.model.PageViewEvent;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.*;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@SpringBootApplication
@EnableBinding(AnalyticsBinding.class)
public class  AnalyticsApplication {

    public static void main(String[] …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka spring-cloud-stream apache-kafka-streams spring-kafka

1
推荐指数
1
解决办法
1019
查看次数

spring 云流-消费群绑定

我的消费者绑定到匿名消费者组,而不是我指定的消费者组。

spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost
          defaultBrokerPort: 9092
          zkNodes: localhost
          defaultZkPort: 2181
        bindings:
          inEvent:
            group: eventin
            destination: event
          outEvent:
            group: eventout
            destination: processevent
Run Code Online (Sandbox Code Playgroud)

我的 Spring Boot 应用程序

@SpringBootApplication
@EnableBinding(EventStream.class)
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
    @StreamListener(value = "inEvent")
    public void getEvent(Event event){
        System.out.println(event.name);
    }
}
Run Code Online (Sandbox Code Playgroud)

我的输入输出通道接口

public interface EventStream {
    @Input("inEvent")
    SubscribableChannel inEvent();
    @Output("outEvent")
    MessageChannel outEvent();
}
Run Code Online (Sandbox Code Playgroud)

我的控制台日志--

:在 3.233 秒内启动 ConsumerApplication(JVM 运行 4.004):[ Consumer clientId=consumer-3, groupId=anonymous.0d0c87d6-ef39-4bfe-b475-4491c40caf6d]发现组协调员 singh:9092 (id: 2147483647 …

spring-boot spring-cloud kafka-consumer-api spring-cloud-stream spring-kafka

1
推荐指数
1
解决办法
2759
查看次数

docker 中的 truststore 问题路径与 spring boot 和 kafka

我在使用 docker 容器部署我的应用程序 spring 启动应用程序时遇到技术问题。

org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: 无法加载 SSL 密钥库 /tmp/tomcat-docbase.4737956707529585395.8080/deployments/app/certs/kafka.truststore.jks

/deployments 是我在 dockerfile 中配置的工作目录

我发现它选择 tmp/tomcat docbase 很奇怪,因为在为其他信任库配置时,它进入了正确的位置。这是我的 application.yaml 里面的内容

spring:
   kafka:
     bootstrap-servers:localhost:9092
     ssl:
       truststore-location: /deployments/app/certs/kafka-truststore.jks
       truststore-password: test
     consumer:
      group-id: consumerid
server:
   ssl:
    enabled: false
    key-store: /deployments/app/certs/dp--dev.jks
    key-store-password: changeit
    trust-store: /deployments/app/certs/ol-truststore-dev.jks
    trust-store-password: test
Run Code Online (Sandbox Code Playgroud)

它是我遗漏的东西还是与我正在使用的 kafka springframework lib 有关?

apache-kafka docker spring-boot spring-kafka

1
推荐指数
1
解决办法
2409
查看次数

创建没有注释和 Spring Boot 的 KafkaListener

我正在尝试在不使用 @KafkaListener 注释的情况下为主题创建 Kafka Consumer。我想这样做是因为我试图在不使用 spring boot 的情况下根据 application.properties 动态创建侦听器。

我认为最好的方法是手动创建一个 KafkaListenerContainerFactory 有人可以提供一个如何在它自己的类中执行此操作的示例。

spring java-8 spring-kafka

1
推荐指数
1
解决办法
5073
查看次数

Java在源代码中设置Kafka保留时间

我有以下问题。我需要在 Kafka 中为某些选定的主题设置保留时间。我找到了一个解决方案,可以使用以下命令进行设置:

kafka-topics --zookeeper localhost:2181 --alter --topic topic-name --config retention.ms=-1
Run Code Online (Sandbox Code Playgroud)

我检查了 Kafka 的 Web UI 并确认它已更改。

如果可能,我想自己在 Java 中设置保留时间,但似乎找不到合适的类/配置来设置时间。我以为我可以在 ProducerConfig 类中获取有关保留的信息,但在那里找不到。

甚至可以在 Java 中设置保留时间,如果可能,我该如何完成?

提前致谢!

java apache-kafka spring-kafka

1
推荐指数
1
解决办法
1290
查看次数

如何在 Spring Boot 测试中禁用`@EnableKafka`?

我想运行我的集成测试,但我不知道如何禁用@EnableKafka.

我的应用程序看起来像这样:

@SpringBootApplication
@EnableKafka
public class MyApplication {
Run Code Online (Sandbox Code Playgroud)

java spring spring-boot spring-kafka spring-boot-test

1
推荐指数
3
解决办法
3740
查看次数

在 Spring 启动应用程序中为 kafka 使用者设置重试策略时,何时使用 ExponentialBackOffPolicy 与 FixedBackOffPolicy?

在 Spring 启动应用程序中为 kafka 使用者设置重试策略时,何时使用 ExponentialBackOffPolicy 与 FixedBackOffPolicy?

我将FixedBackOffPolicy视为BackOffPolicy 的一种实现,它在继续之前暂停一段固定的时间,而ExponentialBackOffPolicy 则视为 BackOffPolicy 的一种实现,它增加了给定集中每次重试尝试的回退时间。

除此之外, FixedBackOffPolicy 扩展了 StatelessBackOffPolicy 而 ExponentialBackOffPolicy 没有。在这方面,请帮助我理解,什么是更喜欢一个而不是另一个的合适用例?

apache-kafka spring-kafka

1
推荐指数
1
解决办法
901
查看次数

停止使用 Stream 侦听器的消息

我正在寻找一种方法来停止使用流侦听器消费消息。

@StreamListener(MBinding.M_INPUT)
    public void consumeMessage(Message<MerchantEvent> message) {
    //handle when receive message
 }

Run Code Online (Sandbox Code Playgroud)
cloud:
        stream:
            bindings:
                MInput:
                    destination: topicName
                    group: groupName
Run Code Online (Sandbox Code Playgroud)

我用谷歌搜索过,但现在仍然不知道如何停止消费。有知道的人吗?

spring-boot spring-kafka

1
推荐指数
1
解决办法
1465
查看次数

Spring 和 Kafka 流 - 如何使用查询 API

我是 kafka 和 kafka 流的新手。我有一个与 kafka 生产者、消费者、KStream 和 KTable 一起工作的基本 Spring 服务。现在,我想检查我的 KTable 记录,因此为了实现它,我正在尝试使用 Kafka Query API。

这可以通过以下方式实现(没有 Spring 集成):

KafkaStreams streams = new KafkaStreams(topology, config);
// Get access to the custom store
MyReadableCustomStore<String,String> store = streams.store("the-custom-store", new MyCustomStoreType<String,String>());
// Query the store
String value = store.read("key");
Run Code Online (Sandbox Code Playgroud)

现在,我尝试使用基于 Spring 的 InteractiveQueryService 来进行查询……但是我在 Spring 启动中遇到了一些依赖问题。

在 Spring 中使用 kafka 查询 API 的最佳方法是什么?

我的服务中的 Spring kafka 配置如下所示:

@Bean("streamsBuilder")
public StreamsBuilderFactoryBean recordsStreamBuilderFactoryBean() {
    Map<String, Object> config = new HashMap<>();
    // set some properties …
Run Code Online (Sandbox Code Playgroud)

spring apache-kafka apache-kafka-streams spring-kafka

1
推荐指数
1
解决办法
1038
查看次数