相关疑难解决方法(0)

删除zookeeper中的kafka使用者组

我正在使用kafka_2.9.2-0.8.1.1和zookeeper 3.4.6.

是否有可以从zookeeper中自动删除使用者组的实用程序?或者我可以删除zookeeper中/ consumers/[group_id]下的所有内容吗?如果是后者,还有什么我错过了吗?这可以用现场系统完成吗?

apache-kafka apache-zookeeper

18
推荐指数
4
解决办法
4万
查看次数

不清楚Kafka中auto.offset.reset和enable.auto.commit的含义

我是Kafka的新手,我不太了解Kafka配置的含义,任何人都可以解释为什么更容易理解!

这是我的代码:

 val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "master:9092,slave1:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "GROUP_2017",
  "auto.offset.reset" -> "latest", //earliest or latest
  "enable.auto.commit" -> (true: java.lang.Boolean)
)
Run Code Online (Sandbox Code Playgroud)

这在我的代码中意味着什么?

apache-kafka kafka-consumer-api

10
推荐指数
3
解决办法
2万
查看次数

KafkaConsumer:`seekToEnd()`不会让消费者从最新的偏移量开始消费

我有以下代码

class Consumer(val consumer: KafkaConsumer<String, ConsumerRecord<String>>) {

    fun run() {
        consumer.seekToEnd(emptyList())
        val pollDuration = 30 // seconds

        while (true) {
            val records = consumer.poll(Duration.ofSeconds(pollDuration))
            // perform record analysis and commitSync()
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

消费者订阅的主题持续接收记录。有时,消费者会因处理步骤而崩溃。当消费者重新启动时,我希望它从主题的最新偏移量开始消费(即忽略消费者关闭时发布到主题的记录)。我认为该seekToEnd()方法可以确保这一点。然而,这个方法似乎根本没有效果。消费者从崩溃的偏移量开始消费。

正确的使用方法是什么seekToEnd()

编辑:使用以下配置创建消费者

fun <T> buildConsumer(valueDeserializer: String): KafkaConsumer<String, T> {
    val props = setupConfig(valueDeserializer)
    Common.setupConsumerSecurityProtocol(props)
    return createConsumer(props)
}

fun setupConfig(valueDeserializer: String): Properties {
    // Configuration setup
    val props = Properties()

    props[ConsumerConfig.GROUP_ID_CONFIG] = config.applicationId
    props[ConsumerConfig.CLIENT_ID_CONFIG] = config.kafka.clientId
    props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = config.kafka.bootstrapServers
    props[AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = config.kafka.schemaRegistryUrl …
Run Code Online (Sandbox Code Playgroud)

kotlin apache-kafka kafka-consumer-api

8
推荐指数
1
解决办法
3599
查看次数

Spring Kafka - 如何使用组ID将偏移重置为最新?

我目前正在使用Spring Integration Kafka进行实时统计.但是,组名使Kafka搜索了侦听器未读取的所有先前值.

@Value("${kafka.consumer.group.id}")
private String consumerGroupId;

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(getDefaultProperties());
}

public Map<String, Object> getDefaultProperties() {
    Map<String, Object> properties = new HashMap<>();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

    properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);

    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    return properties;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

@Bean
public KafkaMessageListener listener() {
    return new KafkaMessageListener();
}
Run Code Online (Sandbox Code Playgroud)

我想开始最新的偏移,而不是被旧的价值所困扰.是否有可能重置组的偏移量?

java spring spring-integration apache-kafka spring-kafka

7
推荐指数
2
解决办法
7998
查看次数

Spring Kafka SeekToCurrentErrorHandler 找出失败的记录

我已经用KafkaHandler. 我的消费者应该消费事件,然后针对每个事件向其他服务发送 REST 请求。我只想在该 REST 服务关闭时重试。否则,我可以忽略失败的事件。

我的容器工厂配置如下:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, MyCustomEvent>
  kafkaListenerContainerFactory() {

  ConcurrentKafkaListenerContainerFactory<String, MyCustomEvent> factory =
    new ConcurrentKafkaListenerContainerFactory<>();

  factory.setConsumerFactory(consumerFactory());
  factory.setStatefulRetry(true);
  factory.setRetryTemplate(retryTemplate());
  factory.setConcurrency(3);

  ContainerProperties containerProperties = factory.getContainerProperties();
  containerProperties.setAckOnError(false);
  containerProperties.setAckMode(AckMode.RECORD);
  containerProperties.setErrorHandler(new SeekToCurrentErrorHandler());

  return factory;
}
Run Code Online (Sandbox Code Playgroud)

ExceptionClassifierRetryPolicy用于设置异常和相应的重试策略。

重试后一切看起来都很好。当我得到一个时它会重试,当我得到一个时ConnectException它会忽略IllegalArgumentException

然而,在IllegalArgumentException场景中,SeekToCurrentErrorHandler返回到未处理的偏移量(因为它寻找未处理的消息,包括失败的消息),最终立即重试失败的消息。消费者不断地来回并重试百万次。

如果我有机会了解哪个记录失败了SeekToCurrentErrorHandler,那么我将创建一个自定义实现SeekToCurrentErrorHandler来检查失败的消息是否可重试(通过使用该thrownException字段)。如果它不可重试,那么我会将它从列表中删除records以寻找回来。

关于如何实现此功能的任何想法?

注:enable.auto.commit设为falseauto.offset.reset设为earliest

谢谢!

apache-kafka spring-retry spring-kafka

7
推荐指数
1
解决办法
9247
查看次数

Kafka Consumer 配置 - auto.offset.reset 如何控制消息消费

我试图了解 ConsumerConfig.auto.offset.reset = latest 如何影响消息消耗。

例如,我有一个消费者,最初在 t1 时间发送 100 条消息,然后我的消费者在 t1+30 秒启动并运行,然后我的消费者会消费 t1+30 秒后发布的消息还是会消费 t1 之后发布的消息?

apache-kafka

6
推荐指数
1
解决办法
2993
查看次数

Kafka消费者不会从最新消息开始

我希望有一个Kafka Consumer,它从一个主题中的最新消息开始.

这是java代码:

private static Properties properties = new Properties();
private static KafkaConsumer<String, String> consumer;
static
{
    properties.setProperty("bootstrap.servers","localhost");
    properties.setProperty("enable.auto.commit", "true");
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.setProperty("group.id", "test");
    properties.setProperty("auto.offset.reset", "latest");
    consumer = new KafkaConsumer<>(properties);

    consumer.subscribe(Collections.singletonList("mytopic"));
}

@Override
public StreamHandler call() throws Exception
{
    while (true) 
    {
        ConsumerRecords<String, String> consumerRecords = consumer.poll(200);
        Iterable<ConsumerRecord<String, String>> records = consumerRecords.records("mytopic");
        for(ConsumerRecord<String, String> rec : records)
        {
            System.out.println(rec.value());
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

尽管auto.offset.reset的值是最新的,但是消费者会在2天前启动表单消息,然后赶上最新的消息.

我错过了什么?

apache-kafka kafka-consumer-api

5
推荐指数
1
解决办法
5405
查看次数

Kafka 中的消费者 ID 和组 ID:是什么让两个消费者相同

我已经使用 Kafka 几个月了,我意识到一些核心概念对我来说还不是很清楚。我的疑问与consumerId、groupId 和offsets 之间的关系有关。在我们的应用程序中,我们需要 Kafka 使用发布 - 订阅范式工作,因此我们为每个消费者使用不同的组 ID,这些 ID 是随机生成的。

我曾经认为设置auto.offset.reset = latest我的消费者总是会收到他们尚未收到的消息,但最近我了解到事实并非如此。这仅在消费者尚未提交偏移量时才有效。在任何其他情况下,消费者将继续接收偏移量大于其提交的最后一个偏移量的消息。

由于我总是使用随机组 ID 创建新消费者,我意识到我的消费者“没有记忆”,他们是新消费者,他们永远不会提交偏移量,因此该auto.offset.reset = latest政策将始终适用。这就是我怀疑的地方。假设以下场景

  1. 我有两个客户端应用程序,A 和 B,每个都有一个消费者,以发布 - 订阅方式工作(因此,具有不同的组 ID)。两个消费者都订阅了该主题my-topicauto.offset.reset设置适用latest于两个消费者。
  2. 一些生产者(或生产者)将消息 M1、M2 和 M3 发布到 topic my-topic
  3. A 和 B 都接收 M1、M2 和 M3。
  4. 现在我关闭应用程序 B。
  5. 生产者产生消息 M4 和 M5。
  6. 应用程序 A 接收消息 M4 和 M5。
  7. 现在我重新启动应用程序 B。记住,它groupId是随机的,而且我没有设置任何消费者 ID,所以这意味着这是一个新的消费者(对吧?)。应用程序 B 没有收到任何消息。
  8. 生产者发布消息 M6 和 M7。
  9. 应用程序 A …

publish-subscribe apache-kafka kafka-consumer-api

4
推荐指数
1
解决办法
4347
查看次数