标签: spring-kafka

如何在springboot中动态地为每个主题创建单独的Kafka监听器?

我是Spring和Kafka的新手.我正在研究一个用例[使用SpringBoot-kafka],允许用户在运行时创建kafka主题.spring应用程序应该在运行时以编程方式订阅这些主题.到目前为止我所知道的是,Kafka听众是设计时间,因此需要在启动前指定主题.有没有办法动态订阅SpringBoot-Kafka集成中的kafka主题?

推荐这个 https://github.com/spring-projects/spring-kafka/issues/132

我计划实现的当前方法是,不要使用Spring-Kafka集成而不是自己实现Kafka消费者[使用java代码]如此处提到的 spring boot kafka consumer - 如何正确地使用来自spring boot的kafka消息

spring spring-mvc spring-boot kafka-consumer-api spring-kafka

8
推荐指数
1
解决办法
4743
查看次数

Spring Kafka:ApplicationContext中不同对象的多个侦听器

我可以请与社区核实听取多个主题的最佳方式是什么,每个主题包含不同类的消息?

在过去的几天里,我一直在玩Spring Kafka.到目前为止我的思考过程:

  • 因为在初始化KafkaListenerContainerFactory时需要将反序列化器传递给DefaultKafkaConsumerFactory.这似乎表明,如果我需要多个容器,每个反序列化一个不同类型的消息,我将无法使用@EnableKafka和@KafkaListener注释.

  • 这使我认为这样做的唯一方法是实例化多个KafkaMessageListenerContainer.

  • 鉴于KafkaMessageListenerContainers是单线程的,我需要同时监听多个主题,我真的应该使用多个ConcurrentKafkaMessageListenerContainers.

我会在这里走上正轨吗?有一个更好的方法吗?

谢谢!

java apache-kafka spring-kafka

8
推荐指数
3
解决办法
9528
查看次数

启用 @KafkaListener 从 application.yml 文件中获取可变主题名称

我试图将多个主题加载到一个主题中,@KafkaListener但遇到了麻烦,因为我相信它正在寻找一个常量值,但是topicsapplication.yml文件中初始化变量会导致一些问题,我想知道是否有人可以帮助我解决这个问题,或者指导我如何将多个 Kafka 主题加载到单个 KafkaListener 中。

@KafkaListener通过将它们传递到逗号分隔的对象中,我可以收听相同的多个主题,如下所示:

@KafkaListener(topics = {
           "flight-events",
           "flight-time-events",
           "service-events",
           "flight-delay-events"
   })
Run Code Online (Sandbox Code Playgroud)

我意识到我可以用逗号分隔的值来表示主题,但我希望能够通过配置文件添加主题,而不是更改代码库中的代码。

我相信可能存在的问题是@KafkaListener 需要接受一个常量值,而我无法将注释定义为常量,有什么办法可以解决这个问题吗?

KafkaWebSocketConnector.java

@Component
public class KafkaWebSocketConnector
{


   @Value("${spring.kafka.topics}")
   private String[] topics;

   @KafkaListener(topics = topics)
   public void listen(ConsumerRecord<?, Map<String, String>> message)
   {
      log.info("Received messages on topic [{}]: [{}]", message.topic(), message.value());
      String dest = "/" + message.topic();
      log.info("destination = {}", dest);
      log.info("msg: {}", message);
      messageTemplate.convertAndSend(dest, message.value());
   }
}
Run Code Online (Sandbox Code Playgroud)

application.yml

spring:
  kafka:
    consumer:
      auto-offset-reset: earliest
      group-id: kafka-websocket-connector
    topics: flight-events,
      flight-time-events, …
Run Code Online (Sandbox Code Playgroud)

spring-boot spring-kafka

8
推荐指数
1
解决办法
1万
查看次数

嵌入的kafka:java.io.FileNotFoundException:/tmp/kafka-7785736914220873149/replication-offset-checkpoint.tmp

我在集成测试中使用kafkaEmbedded,并且得到FileNotFoundException:

java.io.FileNotFoundException: /tmp/kafka-7785736914220873149/replication-offset-checkpoint.tmp 
at java.io.FileOutputStream.open0(Native Method) ~[na:1.8.0_141]
at java.io.FileOutputStream.open(FileOutputStream.java:270) ~[na:1.8.0_141]
at java.io.FileOutputStream.<init>(FileOutputStream.java:213) ~[na:1.8.0_141]
at java.io.FileOutputStream.<init>(FileOutputStream.java:162) ~[na:1.8.0_141]
at kafka.server.checkpoints.CheckpointFile.write(CheckpointFile.scala:43) ~[kafka_2.11-0.11.0.0.jar:na]
at kafka.server.checkpoints.OffsetCheckpointFile.write(OffsetCheckpointFile.scala:58) ~[kafka_2.11-0.11.0.0.jar:na]
at kafka.server.ReplicaManager$$anonfun$checkpointHighWatermarks$2.apply(ReplicaManager.scala:1118) [kafka_2.11-0.11.0.0.jar:na]
at kafka.server.ReplicaManager$$anonfun$checkpointHighWatermarks$2.apply(ReplicaManager.scala:1115) [kafka_2.11-0.11.0.0.jar:na]
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) [scala-library-2.11.11.jar:na]
at scala.collection.immutable.Map$Map1.foreach(Map.scala:116) [scala-library-2.11.11.jar:na]
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) [scala-library-2.11.11.jar:na]
at kafka.server.ReplicaManager.checkpointHighWatermarks(ReplicaManager.scala:1115) [kafka_2.11-0.11.0.0.jar:na]
at kafka.server.ReplicaManager$$anonfun$1.apply$mcV$sp(ReplicaManager.scala:211) [kafka_2.11-0.11.0.0.jar:na]
at kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110) [kafka_2.11-0.11.0.0.jar:na]
at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:57) [kafka_2.11-0.11.0.0.jar:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_141]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_141]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_141]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_141]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_141]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_141]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_141]
Run Code Online (Sandbox Code Playgroud)

我的测试成功通过,但是在构建结束时出现此错误

经过数小时的研究,我发现了这一点:

  • kafka TestUtils.tempDirectory方法用于为嵌入式kafka代理创建临时目录。它还注册关闭钩子,当JVM退出时,该钩子将删除此目录。
  • 当单元测试完成执行时,它将调用System.exit,该系统依次执行所有已注册的关闭挂钩

如果kafka broker在单元测试结束时运行,它将尝试在已删除的dir中写入/读取数据,并产生不同的FileNotFound异常。

我的配置类:

@Configuration
public …
Run Code Online (Sandbox Code Playgroud)

cassandra spring-boot spring-kafka

8
推荐指数
1
解决办法
3325
查看次数

Kafka Streams with Spring Boot

嘿伙计们,我想在我的春季启动项目中使用Kafka Streams实时处理.所以我需要Kafka Streams配置或者我想使用KStreams或KTable,但我在互联网上找不到例子.

我现在做生产者和消费者我想实时流式传输.

apache-kafka spring-boot apache-kafka-streams spring-kafka

8
推荐指数
1
解决办法
1万
查看次数

Spring Kafka MessageListenerContainer

我看到 spring Kafka 代码,我有一些疑问:

  1. 如果我们使用 1 个 @kafkaListener 和 2 个主题,那么 spring Kafka 将创建一个 MessageListenerContainer。如果我为每个主题使用单独的 @kafkaListener ,那么将创建 2 个 MessageListenerContainer 。

  2. MessageListenerContainer 是消费者的意思吗?

  3. 如果我在 ConcurrentKafkaListenerContainerFactory 中将并发数设置为 4,那么这意味着对于每个 kafkaListener,我使用代理打开 4 个线程?这意味着协调员将他们视为 4 个不同的消费者。

  4. 轮询如何与 kafkaListener 一起使用?它每次只从broker那里获取1个ConsumerRecord吗?

请帮忙。

apache-kafka kafka-consumer-api spring-kafka

8
推荐指数
1
解决办法
5363
查看次数

Spring Boot:Kafka健康指标

我有类似下面的东西,效果很好,但我更喜欢在不发送任何消息的情况下检查运行状况(不仅检查套接字连接)。我知道 Kafka 有像 KafkaHealthIndicator 这样开箱即用的东西,有人有使用它的经验或例子吗?

\n
   public class KafkaHealthIndicator implements HealthIndicator {\n   private final Logger log = LoggerFactory.getLogger(KafkaHealthIndicator.class);\n\n   private KafkaTemplate<String, String> kafka;\n\n   public KafkaHealthIndicator(KafkaTemplate<String, String> kafka) {\n   this.kafka = kafka;\n   }\n\n  @Override\n  public Health health() {\n  try {\n     kafka.send("kafka-health-indicator", "\xe2\x9d\xa5").get(100, TimeUnit.MILLISECONDS);\n  } catch (InterruptedException | ExecutionException | TimeoutException e) {\n      return Health.down(e).build();\n  }\n  return Health.up().build();\n }\n}\n
Run Code Online (Sandbox Code Playgroud)\n

java spring apache-kafka spring-boot spring-kafka

8
推荐指数
2
解决办法
1万
查看次数

无法建立与节点 -1 (/127.0.0.1:9092) 的连接。经纪人可能不可用

我正在研究Spring Boot KafkaConfluence 的示例并运行简单的生产者示例并出现以下错误。我使用 Windows 机器并在 Windows 上安装了 ubunt 14.04 LTS。

注意 - 即使我使用 localhost,它仍然无法通过代码工作。

[2m2021-05-30 21:14:23.916[0;39m [32m INFO[0;39m [35m10300[0;39m [2m---[0;39m [2m[           main][0;39m [36mo.s.i.endpoint.EventDrivenConsumer      [0;39m [2m:[0;39m started bean '_org.springframework.integration.errorLogger'
[2m2021-05-30 21:14:23.928[0;39m [32m INFO[0;39m [35m10300[0;39m [2m---[0;39m [2m[           main][0;39m [36mc.e.demo.HelloWorldKafkaApplication     [0;39m [2m:[0;39m Started HelloWorldKafkaApplication in 2.619 seconds (JVM running for 3.694)
[2m2021-05-30 21:14:23.931[0;39m [32m INFO[0;39m [35m10300[0;39m [2m---[0;39m [2m[           main][0;39m [36mcom.example.demo.KafkaProducerService   [0;39m [2m:[0;39m Producing Message- Key: 1, Value: {"name": "John", "age": 48}
[2m2021-05-30 21:14:23.970[0;39m [32m INFO[0;39m [35m10300[0;39m [2m---[0;39m [2m[           main][0;39m [36mo.a.k.clients.producer.ProducerConfig …
Run Code Online (Sandbox Code Playgroud)

apache-kafka spring-kafka

8
推荐指数
2
解决办法
2万
查看次数

Spring kafka setErrorHandler 已弃用替换(引导 2.6.4)

在 Spring Boot 2.6.4 上,此方法已被弃用。

public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
        var factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
        configurer.configure(factory, consumerFactory());

        // deprecated
        factory.setErrorHandler(new GlobalErrorHandler());

        return factory;
    }
Run Code Online (Sandbox Code Playgroud)

全局错误处理程序类

public class GlobalErrorHandler implements ConsumerAwareErrorHandler {

    private static final Logger log = LoggerFactory.getLogger(GlobalErrorHandler.class);

    @Override
    public void handle(Exception thrownException, ConsumerRecord<?, ?> data, Consumer<?, ?> consumer) {
        // my custom global logic (e.g. notify ops team via slack)
    }

}
Run Code Online (Sandbox Code Playgroud)

这个的替代样本是什么?文档说我应该使用setCommonErrorHandler,但是如何实现该CommonErrorHandler接口,因为那里没有可以重写的方法。

要点是,我必须根据特定条件向运营团队发送松弛通知(消息 tpye,可在 kafka 消息头中找到)

这并不是阻塞,只是一条烦人的已弃用消息。谢谢

apache-kafka spring-kafka

8
推荐指数
2
解决办法
1万
查看次数

Spring Kafka-使用Producer Listener配置KafkaTemplate与使用Listenable Future注册回调之间的区别

因此,我正在阅读Spring kafka文档,并遇到了Producer Listener。这就是Spring Kafka文档所说的-

“可选地,您可以配置带有ProducerListener的KafkaTemplate来获取发送(成功或失败)结果的异步回调,而不必等待Future完成。”

他们还指定了接口-

     public interface ProducerListener<K, V> {

      void onSuccess(String topic, Integer partition, K key, V value, 
      RecordMetadata recordMetadata);

      void onError(String topic, Integer partition, K key, V value, 
      Exception exception);

      boolean isInterestedInSuccess();

  }
Run Code Online (Sandbox Code Playgroud)

因此,我的理解是,如果您想对消息的成功和失败做一些事情,请实现ProducerListener接口,并将其注册到KafkaTemplate。它是异步的,因此您不必等待将来完成,就可以知道发送操作的结果。

在此下方大约3句话,它提到您还可以使用KakfaTemplate的send方法返回的ListenableFuture添加回调。这也是异步的。

 future.addCallback(new ListenableFutureCallback<SendResult<Integer, 
   String>>() {

 @Override
 public void onSuccess(SendResult<Integer, String> result) {
    ...
  }

  @Override
  public void onFailure(Throwable ex) {
    ...
  }

  });
Run Code Online (Sandbox Code Playgroud)

所以我想知道两者之间的确切区别是什么,因为它们都是异步的。是onSuccess和onFailure / onError方法中接收到的数据之间的区别。或者是在将回调函数添加到ListenableFuture之前(因为将来在不阻塞it-get()方法的情况下不知道异步计算的结果)之前,已经开发了在KafkaTemplate中添加ProducerListener的功能(反之亦然) 。因此,仅为了确保向后兼容,两者都可以继续使用。使用一种方法相对于其他方法是否具有任何性能优势。

先感谢您。

java spring spring-kafka

7
推荐指数
1
解决办法
2071
查看次数