标签: spring-kafka

为什么Kafka消费者需要很长时间才能开始消费?

我们启动一个Kafka消费者,聆听可能尚未创建的主题(尽管已启用主题自动创建).

此后不久,制作人就该主题发布消息.

但是,消费者需要一些时间才能注意到这一点:确切地说是5分钟.此时,消费者撤销其分区并重新加入消费者组.卡夫卡重新稳定了这个群体.查看消费者与kafka日志的时间戳,此过程在消费者端实例化.

我想这是预期的行为,但我想理解这一点.这实际上是一个重新平衡(从0到1分区)?如果我们提前创建主题,这不会发生吗?

2017-02-01 08:36:45.692  INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [] for group tps-kafka-partitioning
2017-02-01 08:36:45.692  INFO 7 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[]
2017-02-01 08:36:45.693  INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group tps-kafka-partitioning
2017-02-01 08:36:45.738  INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator  : Successfully joined group tps-kafka-partitioning with generation 1
2017-02-01 08:36:45.747  INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [] for group tps-kafka-partitioning
2017-02-01 08:36:45.749  INFO 7 --- [afka-consumer-1] …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-consumer-api spring-kafka

13
推荐指数
1
解决办法
4574
查看次数

Kafka消费者异常和抵消提交

我一直在尝试为Spring Kafka做一些POC工作.具体来说,我想尝试在Kafka中消费消息时处理错误方面的最佳实践.

我想知道是否有人能够提供帮助:

  1. 分享围绕Kafka消费者在发生故障时应该做的最佳实践
  2. 帮助我了解AckMode Record如何工作,以及如何在侦听器方法中抛出异常时阻止对Kafka偏移队列的提交.

2的代码示例如下:

鉴于AckMode设置为RECORD,根据文档:

处理记录后,侦听​​器返回时提交偏移量.

我认为如果监听器方法抛出异常,偏移量不会增加.但是,当我使用下面的代码/配置/命令组合测试它时,情况并非如此.偏移量仍会更新,并继续处理下一条消息.

我的配置:

    private Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092");
    props.put(ProducerConfig.RETRIES_CONFIG, 0);
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
    props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    return props;
}

   @Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.RECORD);
    return factory;
}
Run Code Online (Sandbox Code Playgroud)

我的代码:

@Component
public class KafkaMessageListener{
    @KafkaListener(topicPartitions = {@TopicPartition( topic = "my-replicated-topic", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0", relativeToCurrent = "true"))}) …
Run Code Online (Sandbox Code Playgroud)

java spring apache-kafka kafka-consumer-api spring-kafka

13
推荐指数
1
解决办法
6823
查看次数

带有spring-kafka 2.1.0和SpringBoot 1.5.9的Kafka Consumer上的java.lang.NoSuchMethodError

我正在尝试使用SpringBoot(1.5.9)和Spring-kafka(2.1.0)来设置Kafka Consumer.但是,当我启动我的应用程序时,我在Kafka MessagingMessageListenerAdapter上获得了java.lang.NoSuchMethodError:org.springframework.util.Assert.state(ZLjava/util/function/Supplier;)V.我尝试使用Spring-Kafka(1.2.0)并且错误消失了.有没有其他人遇到这个版本不兼容?

这是我的配置类

@EnableKafka
@Configuration
public class ImporterConfigs{

static Logger logger = Logger.getLogger(ImporterConfigs.class);

@Value("${kafka.bootstrap-servers}")
private static String bootstrapServers;

@Bean
public Map<String, Object> consumerKafkaConfigs() {
  Map<String, Object> props = new HashMap<>();
  // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  // allows a pool of processes to divide the work of consuming and processing records
  props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");

  return props;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
  return new …
Run Code Online (Sandbox Code Playgroud)

spring-kafka

13
推荐指数
1
解决办法
1万
查看次数

使用Spring Embedded Kafka测试@KafkaListener

我正在尝试为使用Spring Boot 2.x开发的Kafka监听器编写单元测试.作为一个单元测试,我不想启动一个完整的Kafka服务器作为Zookeeper的一个实例.所以,我决定使用Spring Embedded Kafka.

我的听众的定义非常基本.

@Component
public class Listener {
    private final CountDownLatch latch;

    @Autowired
    public Listener(CountDownLatch latch) {
        this.latch = latch;
    }

    @KafkaListener(topics = "sample-topic")
    public void listen(String message) {
        latch.countDown();
    }
}
Run Code Online (Sandbox Code Playgroud)

此外,latch在接收消息后验证计数器等于零的测试非常容易.

@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(topics = { "sample-topic" })
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}" })
public class ListenerTest {

    @Autowired
    private KafkaEmbedded embeddedKafka;

    @Autowired
    private CountDownLatch latch;

    private KafkaTemplate<Integer, String> producer;

    @Before
    public void setUp() {
        this.producer = buildKafkaTemplate();
        this.producer.setDefaultTopic("sample-topic");
    }

    private KafkaTemplate<Integer, String> …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka spring-boot spring-kafka spring-boot-test

12
推荐指数
1
解决办法
7941
查看次数

为什么我会收到此日志 - 重置上次看到的纪元

我有一个 Spring Boot 应用程序,它将多个主题发送到 kafka。

在我用来发送的服务中,我已经自动连接了这个。

  private final KafkaTemplate<String, Object> kafkaTemplate;
Run Code Online (Sandbox Code Playgroud)

当我使用此 kafkaTemplate 发送到多个主题时,我会在日志中收到以下信息。

[Producer clientId= Producer-1] 由于关联的 topicId 从 null 更改为 J09iQeooQtOuAyEfntux_g,因此将分区 ***** 的最后一次看到的纪元重置为 87

发送。我创建了一个 GenericRecord

GenericRecord kafkaValue = new GenericData.Record(schema);
Run Code Online (Sandbox Code Playgroud)

然后我在 genericRecord 上设置值,准备好后我用这一行发送。

kafkaStringTemplate.send(new ProducerRecord<>(sendMessageConfiguration.getTopicName(), textKey, kafkaValue));
Run Code Online (Sandbox Code Playgroud)

这个警告重要吗?如果是的话我做错了什么?在我看来,所有消息都已发送。

spring-kafka

12
推荐指数
0
解决办法
2104
查看次数

如何确认spring kafka中的当前偏移量以进行手动提交

我第一次使用Spring Kafka而且我无法在我的消费者代码中使用Acknowledgement.acknowledge()方法进行手动提交.如果我的消费者配置或监听器代码中缺少任何内容,请告诉我.或者是否有其他方法根据条件处理确认偏移.在这里,我正在寻找解决方案,如果没有手动提交/确认偏移,它应该由消费者选择相同的消息/偏移.

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;

@EnableKafka
@Configuration
public class ConsumerConfig {

@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;

@Value(value = "${kafka.groupId}")
private String groupId;

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> containerFactory() {
Map<String, Object> props = new HashMap<String, Object>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
        StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
        StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<String, String>(
        props)); …
Run Code Online (Sandbox Code Playgroud)

apache-kafka spring-kafka

11
推荐指数
3
解决办法
2万
查看次数

Spring Kafka Producer没有发送到Kafka 1.0.0(Magic v1不支持记录头)

我正在使用这个docker-compose设置在本地设置Kafka:https://github.com/wurstmeister/kafka-docker/

docker-compose up 工作正常,通过shell创建主题工作正常.

现在我尝试通过连接到Kafka spring-kafka:2.1.0.RELEASE

启动Spring应用程序时,它会打印正确版本的Kafka:

o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.0
o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : aaa7af6d4a11b29d
Run Code Online (Sandbox Code Playgroud)

我尝试发送这样的消息

kafkaTemplate.send("test-topic", UUID.randomUUID().toString(), "test");
Run Code Online (Sandbox Code Playgroud)

在客户端发送失败

UnknownServerException: The server experienced an unexpected error when processing the request
Run Code Online (Sandbox Code Playgroud)

在服务器控制台中,我收到消息Magic v1不支持记录头

Error when handling request {replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=2147483647,topics=[{topic=test-topic,partitions=[{partition=0,fetch_offset=39,max_bytes=1048576}]}]} (kafka.server.KafkaApis)
java.lang.IllegalArgumentException: Magic v1 does not support record headers
Run Code Online (Sandbox Code Playgroud)

谷歌搜索建议版本冲突,但版本似乎适合(org.apache.kafka:kafka-clients:1.0.0在类路径中).

有线索吗?谢谢!

编辑:我缩小了问题的根源.发送纯字符串有效,但通过JsonSerializer发送Json会导致给定的问题.这是我的生产者配置的内容:

@Value("\${kafka.bootstrap-servers}")
lateinit var bootstrapServers: String

@Bean
fun producerConfigs(): Map<String, Any> =
        HashMap<String, Any>().apply {
            // list of host:port pairs used …
Run Code Online (Sandbox Code Playgroud)

spring apache-kafka spring-boot docker-compose spring-kafka

11
推荐指数
2
解决办法
9541
查看次数

spring boot中创建KafkaTemplate的正确方法

我尝试在Spring Boot 应用程序中配置apache kafka。我阅读了此文档并按照以下步骤操作:

1)我将此行添加到aplication.yaml

spring:
  kafka:
    bootstrap-servers: kafka_host:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringDeserializer
      value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
Run Code Online (Sandbox Code Playgroud)

2)我创建新主题:

    @Bean
    public NewTopic responseTopic() {
        return new NewTopic("new-topic", 5, (short) 1);
    }
Run Code Online (Sandbox Code Playgroud)

现在我想使用KafkaTemplate

private final KafkaTemplate<String, byte[]> kafkaTemplate;

public KafkaEventBus(KafkaTemplate<String, byte[]> kafkaTemplate) {
    this.kafkaTemplate = kafkaTemplate;
}
Run Code Online (Sandbox Code Playgroud)

但 Intellij IDE 强调:

在此处输入图片说明

为了解决这个问题,我需要创建 bean:

@Bean
public KafkaTemplate<String, byte[]> myMessageKafkaTemplate() {
    return new KafkaTemplate<>(greetingProducerFactory());
}
Run Code Online (Sandbox Code Playgroud)

并传递给构造函数属性greetingProducerFactory()

@Bean
public ProducerFactory<String, byte[]> greetingProducerFactory() {
    Map<String, Object> configProps = …
Run Code Online (Sandbox Code Playgroud)

java spring apache-kafka spring-boot spring-kafka

11
推荐指数
2
解决办法
9963
查看次数

当所有消费者都被删除时,Kafka 消费者组会发生什么

我有一个带有自动缩放器的服务,每个实例都需要位于一个单独的消费者组中,我通过创建随机消费者组名称来实现它group-id: my-service-${random.uuid}。我想知道如果所有消费者都消失了,一个消费者组会发生什么。我注意到在 Confluence 平台中,我有很多消费者组,但没有任何消费者。

没有任何消费者的消费群体还能存在多久?

如何配置消费者组在服务被删除后 5 分钟后被删除(我希望删除该组,而不仅仅是删除偏移量)?

spring apache-kafka spring-boot kafka-consumer-api spring-kafka

11
推荐指数
1
解决办法
2901
查看次数

如何以编程方式自动配置?

我有一个春季启动kafka应用程序。我的经纪人每隔几天就会被回收一次。旧的经纪人已取消配置,新的经纪人已配置。

我有一个调度程序,每隔几个小时要检查一次代理。我想确保一旦有了新的经纪人,我们就应该重新加载所有与Spring Kafka相关的bean。与KafkaAutoConfiguration非常相似,除了我希望触发代理值更改并以编程方式加载自动配置。

每当将旧的代理替换为新的代理时,如何以编程方式调用自动配置?

java spring spring-boot spring-kafka

10
推荐指数
1
解决办法
424
查看次数