我是Spring和Kafka的新手.我正在研究一个用例[使用SpringBoot-kafka],允许用户在运行时创建kafka主题.spring应用程序应该在运行时以编程方式订阅这些主题.到目前为止我所知道的是,Kafka听众是设计时间,因此需要在启动前指定主题.有没有办法动态订阅SpringBoot-Kafka集成中的kafka主题?
推荐这个 https://github.com/spring-projects/spring-kafka/issues/132
我计划实现的当前方法是,不要使用Spring-Kafka集成而不是自己实现Kafka消费者[使用java代码]如此处提到的 spring boot kafka consumer - 如何正确地使用来自spring boot的kafka消息
spring spring-mvc spring-boot kafka-consumer-api spring-kafka
我可以请与社区核实听取多个主题的最佳方式是什么,每个主题包含不同类的消息?
在过去的几天里,我一直在玩Spring Kafka.到目前为止我的思考过程:
因为在初始化KafkaListenerContainerFactory时需要将反序列化器传递给DefaultKafkaConsumerFactory.这似乎表明,如果我需要多个容器,每个反序列化一个不同类型的消息,我将无法使用@EnableKafka和@KafkaListener注释.
这使我认为这样做的唯一方法是实例化多个KafkaMessageListenerContainer.
鉴于KafkaMessageListenerContainers是单线程的,我需要同时监听多个主题,我真的应该使用多个ConcurrentKafkaMessageListenerContainers.
我会在这里走上正轨吗?有一个更好的方法吗?
谢谢!
我试图将多个主题加载到一个主题中,@KafkaListener
但遇到了麻烦,因为我相信它正在寻找一个常量值,但是topics
从application.yml
文件中初始化变量会导致一些问题,我想知道是否有人可以帮助我解决这个问题,或者指导我如何将多个 Kafka 主题加载到单个 KafkaListener 中。
@KafkaListener
通过将它们传递到逗号分隔的对象中,我可以收听相同的多个主题,如下所示:
@KafkaListener(topics = {
"flight-events",
"flight-time-events",
"service-events",
"flight-delay-events"
})
Run Code Online (Sandbox Code Playgroud)
我意识到我可以用逗号分隔的值来表示主题,但我希望能够通过配置文件添加主题,而不是更改代码库中的代码。
我相信可能存在的问题是@KafkaListener 需要接受一个常量值,而我无法将注释定义为常量,有什么办法可以解决这个问题吗?
KafkaWebSocketConnector.java
@Component
public class KafkaWebSocketConnector
{
@Value("${spring.kafka.topics}")
private String[] topics;
@KafkaListener(topics = topics)
public void listen(ConsumerRecord<?, Map<String, String>> message)
{
log.info("Received messages on topic [{}]: [{}]", message.topic(), message.value());
String dest = "/" + message.topic();
log.info("destination = {}", dest);
log.info("msg: {}", message);
messageTemplate.convertAndSend(dest, message.value());
}
}
Run Code Online (Sandbox Code Playgroud)
application.yml
spring:
kafka:
consumer:
auto-offset-reset: earliest
group-id: kafka-websocket-connector
topics: flight-events,
flight-time-events, …
Run Code Online (Sandbox Code Playgroud) 我在集成测试中使用kafkaEmbedded,并且得到FileNotFoundException:
java.io.FileNotFoundException: /tmp/kafka-7785736914220873149/replication-offset-checkpoint.tmp
at java.io.FileOutputStream.open0(Native Method) ~[na:1.8.0_141]
at java.io.FileOutputStream.open(FileOutputStream.java:270) ~[na:1.8.0_141]
at java.io.FileOutputStream.<init>(FileOutputStream.java:213) ~[na:1.8.0_141]
at java.io.FileOutputStream.<init>(FileOutputStream.java:162) ~[na:1.8.0_141]
at kafka.server.checkpoints.CheckpointFile.write(CheckpointFile.scala:43) ~[kafka_2.11-0.11.0.0.jar:na]
at kafka.server.checkpoints.OffsetCheckpointFile.write(OffsetCheckpointFile.scala:58) ~[kafka_2.11-0.11.0.0.jar:na]
at kafka.server.ReplicaManager$$anonfun$checkpointHighWatermarks$2.apply(ReplicaManager.scala:1118) [kafka_2.11-0.11.0.0.jar:na]
at kafka.server.ReplicaManager$$anonfun$checkpointHighWatermarks$2.apply(ReplicaManager.scala:1115) [kafka_2.11-0.11.0.0.jar:na]
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) [scala-library-2.11.11.jar:na]
at scala.collection.immutable.Map$Map1.foreach(Map.scala:116) [scala-library-2.11.11.jar:na]
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) [scala-library-2.11.11.jar:na]
at kafka.server.ReplicaManager.checkpointHighWatermarks(ReplicaManager.scala:1115) [kafka_2.11-0.11.0.0.jar:na]
at kafka.server.ReplicaManager$$anonfun$1.apply$mcV$sp(ReplicaManager.scala:211) [kafka_2.11-0.11.0.0.jar:na]
at kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110) [kafka_2.11-0.11.0.0.jar:na]
at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:57) [kafka_2.11-0.11.0.0.jar:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_141]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_141]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_141]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_141]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_141]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_141]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_141]
Run Code Online (Sandbox Code Playgroud)
我的测试成功通过,但是在构建结束时出现此错误
经过数小时的研究,我发现了这一点:
如果kafka broker在单元测试结束时运行,它将尝试在已删除的dir中写入/读取数据,并产生不同的FileNotFound异常。
我的配置类:
@Configuration
public …
Run Code Online (Sandbox Code Playgroud) 嘿伙计们,我想在我的春季启动项目中使用Kafka Streams实时处理.所以我需要Kafka Streams配置或者我想使用KStreams或KTable,但我在互联网上找不到例子.
我现在做生产者和消费者我想实时流式传输.
我看到 spring Kafka 代码,我有一些疑问:
如果我们使用 1 个 @kafkaListener 和 2 个主题,那么 spring Kafka 将创建一个 MessageListenerContainer。如果我为每个主题使用单独的 @kafkaListener ,那么将创建 2 个 MessageListenerContainer 。
MessageListenerContainer 是消费者的意思吗?
如果我在 ConcurrentKafkaListenerContainerFactory 中将并发数设置为 4,那么这意味着对于每个 kafkaListener,我使用代理打开 4 个线程?这意味着协调员将他们视为 4 个不同的消费者。
轮询如何与 kafkaListener 一起使用?它每次只从broker那里获取1个ConsumerRecord吗?
请帮忙。
我有类似下面的东西,效果很好,但我更喜欢在不发送任何消息的情况下检查运行状况(不仅检查套接字连接)。我知道 Kafka 有像 KafkaHealthIndicator 这样开箱即用的东西,有人有使用它的经验或例子吗?
\n public class KafkaHealthIndicator implements HealthIndicator {\n private final Logger log = LoggerFactory.getLogger(KafkaHealthIndicator.class);\n\n private KafkaTemplate<String, String> kafka;\n\n public KafkaHealthIndicator(KafkaTemplate<String, String> kafka) {\n this.kafka = kafka;\n }\n\n @Override\n public Health health() {\n try {\n kafka.send("kafka-health-indicator", "\xe2\x9d\xa5").get(100, TimeUnit.MILLISECONDS);\n } catch (InterruptedException | ExecutionException | TimeoutException e) {\n return Health.down(e).build();\n }\n return Health.up().build();\n }\n}\n
Run Code Online (Sandbox Code Playgroud)\n 我正在研究Spring Boot Kafka
Confluence 的示例并运行简单的生产者示例并出现以下错误。我使用 Windows 机器并在 Windows 上安装了 ubunt 14.04 LTS。
注意 - 即使我使用 localhost,它仍然无法通过代码工作。
[2m2021-05-30 21:14:23.916[0;39m [32m INFO[0;39m [35m10300[0;39m [2m---[0;39m [2m[ main][0;39m [36mo.s.i.endpoint.EventDrivenConsumer [0;39m [2m:[0;39m started bean '_org.springframework.integration.errorLogger'
[2m2021-05-30 21:14:23.928[0;39m [32m INFO[0;39m [35m10300[0;39m [2m---[0;39m [2m[ main][0;39m [36mc.e.demo.HelloWorldKafkaApplication [0;39m [2m:[0;39m Started HelloWorldKafkaApplication in 2.619 seconds (JVM running for 3.694)
[2m2021-05-30 21:14:23.931[0;39m [32m INFO[0;39m [35m10300[0;39m [2m---[0;39m [2m[ main][0;39m [36mcom.example.demo.KafkaProducerService [0;39m [2m:[0;39m Producing Message- Key: 1, Value: {"name": "John", "age": 48}
[2m2021-05-30 21:14:23.970[0;39m [32m INFO[0;39m [35m10300[0;39m [2m---[0;39m [2m[ main][0;39m [36mo.a.k.clients.producer.ProducerConfig …
Run Code Online (Sandbox Code Playgroud) 在 Spring Boot 2.6.4 上,此方法已被弃用。
public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
var factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
configurer.configure(factory, consumerFactory());
// deprecated
factory.setErrorHandler(new GlobalErrorHandler());
return factory;
}
Run Code Online (Sandbox Code Playgroud)
全局错误处理程序类
public class GlobalErrorHandler implements ConsumerAwareErrorHandler {
private static final Logger log = LoggerFactory.getLogger(GlobalErrorHandler.class);
@Override
public void handle(Exception thrownException, ConsumerRecord<?, ?> data, Consumer<?, ?> consumer) {
// my custom global logic (e.g. notify ops team via slack)
}
}
Run Code Online (Sandbox Code Playgroud)
这个的替代样本是什么?文档说我应该使用setCommonErrorHandler
,但是如何实现该CommonErrorHandler
接口,因为那里没有可以重写的方法。
要点是,我必须根据特定条件向运营团队发送松弛通知(消息 tpye,可在 kafka 消息头中找到)
这并不是阻塞,只是一条烦人的已弃用消息。谢谢
因此,我正在阅读Spring kafka文档,并遇到了Producer Listener。这就是Spring Kafka文档所说的-
“可选地,您可以配置带有ProducerListener的KafkaTemplate来获取发送(成功或失败)结果的异步回调,而不必等待Future完成。”
他们还指定了接口-
public interface ProducerListener<K, V> {
void onSuccess(String topic, Integer partition, K key, V value,
RecordMetadata recordMetadata);
void onError(String topic, Integer partition, K key, V value,
Exception exception);
boolean isInterestedInSuccess();
}
Run Code Online (Sandbox Code Playgroud)
因此,我的理解是,如果您想对消息的成功和失败做一些事情,请实现ProducerListener接口,并将其注册到KafkaTemplate。它是异步的,因此您不必等待将来完成,就可以知道发送操作的结果。
在此下方大约3句话,它提到您还可以使用KakfaTemplate的send方法返回的ListenableFuture添加回调。这也是异步的。
future.addCallback(new ListenableFutureCallback<SendResult<Integer,
String>>() {
@Override
public void onSuccess(SendResult<Integer, String> result) {
...
}
@Override
public void onFailure(Throwable ex) {
...
}
});
Run Code Online (Sandbox Code Playgroud)
所以我想知道两者之间的确切区别是什么,因为它们都是异步的。是onSuccess和onFailure / onError方法中接收到的数据之间的区别。或者是在将回调函数添加到ListenableFuture之前(因为将来在不阻塞it-get()方法的情况下不知道异步计算的结果)之前,已经开发了在KafkaTemplate中添加ProducerListener的功能(反之亦然) 。因此,仅为了确保向后兼容,两者都可以继续使用。使用一种方法相对于其他方法是否具有任何性能优势。
先感谢您。