我正在使用 @RetryableTopic 在 kafka 消费者中实现重试逻辑。我给出的配置如下:
@RetryableTopic(
attempts = "4",
backoff = @Backoff(delay = 300000, multiplier = 10.0),
autoCreateTopics = "false",
topicSuffixingStrategy = SUFFIX_WITH_INDEX_VALUE
)
Run Code Online (Sandbox Code Playgroud)
然而,它不是重试 4 次,而是无限重试,而且也没有延迟时间。有人可以帮我解决代码吗?我希望该消息重试 4 次,第一次延迟 - 5 分钟后,然后第二次延迟 10 分钟后,第三次延迟 20 分钟后......
代码如下:
int i = 1;
@RetryableTopic(
attempts = "4",
backoff = @Backoff(delay = 300000, multiplier = 10.0),
autoCreateTopics = "false",
topicSuffixingStrategy = SUFFIX_WITH_INDEX_VALUE
)
@KafkaListener(topics = "topic_string_data", containerFactory = "default")
public void consume(@Payload String message , @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
String prachi = null; …Run Code Online (Sandbox Code Playgroud) 我试图理解 RetryTemplate 和 @RetryableTopic 之间的异同。
RetryTemplate 定义在 KafkaListenerContainerFactory 内部,@RetryableTopic 注解与 @KafkaListener 一起添加。这更多的是风格上的差异还是它们有两个不同的目的?
我需要开始使用卡夫卡。我很难弄清楚消费者应该收到什么:根据我的理解,我们可以通过多种方式配置消费者:
示例1:
@KafkaListener(topics = "topic_name)
public void receiveSimpleString(@Payload String message) {
}
Run Code Online (Sandbox Code Playgroud)
示例2:
@KafkaListener(topics = "topic_name)
public void receiveConsumerRecord(@Payload ConsumerRecord<String, String> message) {
}
Run Code Online (Sandbox Code Playgroud)
示例3:
@KafkaListener(topics = "topic_name)
public void receiveObject(@Payload SomeCustomClass message) {
}
Run Code Online (Sandbox Code Playgroud)
示例4:
@KafkaListener(topics = "topic_name)
public void receiveSpringMessage(@Payload org.springframework.messaging.Message<T> message) {
}
Run Code Online (Sandbox Code Playgroud)
也许还有更多的方法,但那些曾经是我在研究 kafka+spring 时最常看到的。
现在的问题是:
是否有关于消费者应该收到什么的最佳实践?不同的例子有优点/缺点吗?
java apache-kafka spring-boot kafka-consumer-api spring-kafka
在我们的应用程序中,生产者和消费者都启用了一次。
Producer是一个python组件。我们已经启用:
Consumer 是一个 Spring Boot 应用程序。我们已启用:
我们在 ConfluenceCloud 上有多分区 Kafka 主题(假设有 3 个分区)。
我们的应用程序设计如下:
问题:
我们注意到,有时同一条 Kafka 消息会在 Consumer 中被多次消费。我们通过使用以下消费者代码检测到了这一点。我们将之前消费的kafka消息Id(带偏移量)保存在Redis中,并与新消费的消息进行比较。
消费者代码:
@KafkaListener(topics = "${datalake.datasetevents.topic}", groupId = "${spring.kafka.consumer.group-id}")
public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
@Header(KafkaHeaders.OFFSET) String offset,
@Payload InputEvent inputEvent, Acknowledgment acknowledgment) {
//KafkaHeaders.
Event event = new Event();
event.setCorrId(inputEvent.getCorrId());
event.setQn(inputEvent.getQn());
event.setCreatedTs(new Date());
event.setEventTs(inputEvent.getEventTs());
event.setMeta(inputEvent.getMeta() != null ? inputEvent.getMeta(): new HashMap<>());
event.setType(inputEvent.getType());
event.setUlid(key);
//detect message duplications
try {
String eventRedisKey …Run Code Online (Sandbox Code Playgroud) 当主题不存在时,我不想从我的消费者应用程序自动创建主题。
我知道这是一个 Kafka 服务器级别的配置,用于禁用自动主题创建 ( auto.create.topics.enable = false),但我无法在我的基础设施中进行此更改。
因此,我正在寻找一种方法来禁用我的消费者应用程序中的自动主题创建(使用 Spring Kafka)。
我尝试设置
spring:
kafka:
consumer:
properties:
allow.auto.create.topics: false
Run Code Online (Sandbox Code Playgroud)
但它不起作用!
似乎 Kafka 添加了此支持: https://cwiki.apache.org/confluence/display/KAFKA/KIP-361%3A+Add+Consumer+Configuration+to+Disable+Auto+Topic+Creation
有人可以帮忙吗?
基于spring-kafka的文档,我使用基于注释的@KafkaListener来配置我的消费者.
我看到的是 -
除非我将偏移量指定为零,否则在开始时,Kafka消费者将收集未来的消息,而不是现有的消息.(我知道这是预期的结果,因为我没有指定我想要的偏移量)
我在文档中看到一个选项,用于指定主题+分区组合以及零偏移,但如果我这样做 - 我必须明确指定我希望我的消费者听哪个主题.
使用上面的方法2,这就是我的消费者现在的样子 -
@KafkaListener(id = "{group.id}",
topicPartitions = {
@TopicPartition(topic = "${kafka.topic.name}",
partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))
},
containerFactory = "kafkaListenerContainerFactory")
public void listen(@Payload String payload,
Acknowledgment ack) throws InterruptedException, IOException {
logger.debug("This is what we received in the Kafka Consumer = " + payload);
idService.process(payload);
ack.acknowledge();
}
Run Code Online (Sandbox Code Playgroud)
虽然我知道有一个选项可以指定"topicPattern"通配符或"主题"列表作为注释配置的一部分,但我没有看到我可以提供偏移值从零开始的地方列出的主题/主题模式.有没有办法将两者结合起来?请指教.
我是Kakfa Spring集成的新手.我已经实现了Kafka消息发送和One Listener,它对我来说很好.但是我希望在两个地方听到同样的信息.谁能帮我.以下是我的代码.
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.group-id=sample-group
spring.kafka.producer.batch-size= 16384
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.retries= 0
spring.kafka.producer.buffer-memory=33554432
spring.kafka.template.default-topic=spring-topic
Run Code Online (Sandbox Code Playgroud)
发件人代码:
public void testSimple() {
System.out.println("Sending Topic Here");
template.send("spring-topic", 0, "foo");
template.flush();
}
Run Code Online (Sandbox Code Playgroud)
接收器:
@KafkaListener(id = "foo", topics = "spring-topic")
public void listen1(String foo) {
System.out.println("Got Notification Here");
this.latch1.countDown();
}
Run Code Online (Sandbox Code Playgroud)
可以帮助我如何在不同的地方阅读相同的消息.
apache-kafka kafka-consumer-api kafka-producer-api spring-kafka
考虑以下代码 -
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
groupId);
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
Run Code Online (Sandbox Code Playgroud)
我创建了一个消费者工厂和一个concurrentKafkaListenercontainer Factory.我没有为监听器Factory设置并发性.我有一个用@KafkaListener注释的方法
@KafkaListener(topics = "topicName")
public void listen(String message) {
System.out.println("Received Message: " + message);
Run Code Online (Sandbox Code Playgroud)
当我不设置并发属性时,Spring会创建1个消费者实例,1个kafka监听器容器属于消费者工厂中指定的组吗?
如果我将并发性更改为3,那么Spring会创建3个消费者实例,因此在配置消费者工厂和3个侦听器容器时,同一个消费者组中有3个消费者吗?
此外,根据并发性,我们假设我们现在只听一个主题,我们将有3个用@kafkalistener注释的方法,如果没有指定分区,则所有3个方法都会监听不同的分区(由kafka以循环方式提供).?
我是卡夫卡的新手,想澄清我的理解.
我们正在考虑在我们的消息传递中使用Kafka,并且我们的应用程序是使用Spring开发的。因此,我们计划使用spring-kafka。
生产者将消息作为HashMap对象放入队列。我们有JSON序列化程序,并且我们假设地图将被序列化并放入队列中。这是生产者配置。
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.springframework.kafka.support.serializer.JsonSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
Run Code Online (Sandbox Code Playgroud)
另一方面,我们有一个侦听器,用于侦听生产者发布消息的相同主题。这是使用者配置:
spring:
kafka:
consumer:
group-id: xyz
key-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
Run Code Online (Sandbox Code Playgroud)
我们的监听器方法:
public void listener(SomeClass abx)
Run Code Online (Sandbox Code Playgroud)
我们预期json将被反序列化,并且将生成“ SomeClass”类型的对象。但是显然,它引发了反序列化异常。
我们看到的文章很少,建议这样做是:
@Bean
public ConsumerFactory<String, Car> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
new JsonDeserializer<>(Car.class));
}
Run Code Online (Sandbox Code Playgroud)
我们不想编写一些代码来创建反序列化器。有没有我们所缺少的样板东西?任何帮助将不胜感激!!
试图弄清楚我是否可以使用spring-kafka和spring-kafka-test为@KafkaListener编写单元测试。
我的侦听器类。
public class MyKafkaListener {
@Autowired
private MyMessageProcessor myMessageProcessor;
@KafkaListener(topics = "${kafka.topic.01}", groupId = "SF.CLIENT", clientIdPrefix = "SF.01", containerFactory = "myMessageListenerContainerFactory")
public void myMessageListener(MyMessage message) {
myMessageProcessor.process(message);
log.info("MyMessage processed");
}}
Run Code Online (Sandbox Code Playgroud)
我的测试班:
@RunWith(SpringRunner.class)
@DirtiesContext
@EmbeddedKafka(partitions = 1, topics = {"I1.Topic.json.001"})
@ContextConfiguration(classes = {TestKafkaConfig.class})
public class MyMessageConsumersTest {
@Autowired
private MyMessageProcessor myMessageProcessor;
@Value("${kafka.topic.01}")
private String TOPIC_01;
@Autowired
private KafkaTemplate<String, MyMessage> messageProducer;
@Test
public void testSalesforceMessageListner() {
MyMessageConsumers myMessageConsumers = new MyMessageConsumers(mockService);
messageProducer.send(TOPIC_01, "MessageID", new MyMessage());
verify(myMessageProcessor, times(1)).process(any(MyMessage.class));
}}
Run Code Online (Sandbox Code Playgroud)
我的测试配置类:
@Configuration
@EnableKafka …Run Code Online (Sandbox Code Playgroud)