标签: kafka-consumer-api

卡夫卡消费者抵消最大值?

我在谷歌上搜索和阅读Kafka文档,但我找不到消费者偏移的最大值以及最大值后是否有偏移环绕.我理解offset是一个Int64值,所以最大值是0xFFFFFFFFFFFFFFFF.如果有回旋,卡夫卡如何处理这种情况?

offset apache-kafka kafka-consumer-api

13
推荐指数
1
解决办法
6567
查看次数

kafka.consumer.SimpleConsumer:由于套接字错误而重新连接:java.nio.channels.ClosedChannelException

我正在为kafka运行一个简单的消费者,例如:

int timeout = 80000;
int bufferSize = 64*1024;
consumer = new SimpleConsumer(host, port,timeout, bufferSize, clientName);
Run Code Online (Sandbox Code Playgroud)

这运行好几个小时,但后来我在kafka.consumer.SimpleConsumer上得到一个例外:由于套接字错误重新连接:

java.nio.channels.ClosedChannelException

和消费者停止......以前有人遇到过这个问题吗?

java sockets nio apache-kafka kafka-consumer-api

13
推荐指数
1
解决办法
6014
查看次数

卡夫卡消费者 - 民意调查行为

关于KafkaConsumer(> = 0.9),我正面临一些严重的问题,试图为我的需求实施解决方案.

让我们假设我有一个函数必须只读取来自kafka主题的n条消息.

例如:getMsgs(5)- > 在主题中获取下一个5 kafka消息.

所以,我有一个看起来像这样的循环:

for (boolean exit= false;!exit;)
{
   Records = consumer.poll(config.pollTime);
   for (Record r:records) {
       processRecord(r); //do my things
       numMss++;
       if (numMss==maximum) //maximum=5
          exit=true;
   }
}
Run Code Online (Sandbox Code Playgroud)

考虑到这一点,问题是poll()方法可以获得超过5条消息.例如,如果它获得10条消息,我的代码将永远忘记其他5条消息,因为Kafka会认为它们已经消耗掉了.

我尝试提交偏移但似乎不起作用:

    consumer.commitSync(Collections.singletonMap(partition,
    new OffsetAndMetadata(record.offset() + 1)));
Run Code Online (Sandbox Code Playgroud)

即使使用偏移配置,每当我再次启动消费者时,它都不会从第6条消息开始(记住,我只想要5条消息),但是从第11条开始(因为第一次轮询消耗了10条消息).

有没有解决方案呢,或者(最肯定的)我错过了什么?

提前致谢!!

apache-kafka kafka-consumer-api

13
推荐指数
1
解决办法
3万
查看次数

为什么Kafka消费者需要很长时间才能开始消费?

我们启动一个Kafka消费者,聆听可能尚未创建的主题(尽管已启用主题自动创建).

此后不久,制作人就该主题发布消息.

但是,消费者需要一些时间才能注意到这一点:确切地说是5分钟.此时,消费者撤销其分区并重新加入消费者组.卡夫卡重新稳定了这个群体.查看消费者与kafka日志的时间戳,此过程在消费者端实例化.

我想这是预期的行为,但我想理解这一点.这实际上是一个重新平衡(从0到1分区)?如果我们提前创建主题,这不会发生吗?

2017-02-01 08:36:45.692  INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [] for group tps-kafka-partitioning
2017-02-01 08:36:45.692  INFO 7 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[]
2017-02-01 08:36:45.693  INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group tps-kafka-partitioning
2017-02-01 08:36:45.738  INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator  : Successfully joined group tps-kafka-partitioning with generation 1
2017-02-01 08:36:45.747  INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [] for group tps-kafka-partitioning
2017-02-01 08:36:45.749  INFO 7 --- [afka-consumer-1] …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-consumer-api spring-kafka

13
推荐指数
1
解决办法
4574
查看次数

Kafka消费者异常和抵消提交

我一直在尝试为Spring Kafka做一些POC工作.具体来说,我想尝试在Kafka中消费消息时处理错误方面的最佳实践.

我想知道是否有人能够提供帮助:

  1. 分享围绕Kafka消费者在发生故障时应该做的最佳实践
  2. 帮助我了解AckMode Record如何工作,以及如何在侦听器方法中抛出异常时阻止对Kafka偏移队列的提交.

2的代码示例如下:

鉴于AckMode设置为RECORD,根据文档:

处理记录后,侦听​​器返回时提交偏移量.

我认为如果监听器方法抛出异常,偏移量不会增加.但是,当我使用下面的代码/配置/命令组合测试它时,情况并非如此.偏移量仍会更新,并继续处理下一条消息.

我的配置:

    private Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092");
    props.put(ProducerConfig.RETRIES_CONFIG, 0);
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
    props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    return props;
}

   @Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.RECORD);
    return factory;
}
Run Code Online (Sandbox Code Playgroud)

我的代码:

@Component
public class KafkaMessageListener{
    @KafkaListener(topicPartitions = {@TopicPartition( topic = "my-replicated-topic", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0", relativeToCurrent = "true"))}) …
Run Code Online (Sandbox Code Playgroud)

java spring apache-kafka kafka-consumer-api spring-kafka

13
推荐指数
1
解决办法
6823
查看次数

一个kafka topic可以处理多少个消费者群体?

假设我有一个大约有 10 个分区的 kafka 主题,我知道每个消费者组应该有 10 个消费者在任何给定时间从该主题中读取数据,以实现最大并行度。

然而,我想知道对于一个主题在任何给定时间点可以处理的消费者组的数量是否也有任何直接的规则。(最近在一次采访中我被问到了这个问题)。据我所知,这取决于代理的配置,以便它在任何给定时间点可以处理多少个连接。

但是,只是想知道在给定时间点可以扩展多少个最大消费者组(每个消费者组有 10 个消费者)?

apache-kafka kafka-consumer-api kafka-topic

13
推荐指数
3
解决办法
1万
查看次数

Kafka使用者偏移超出范围,没有为分区配置重置策略

我在启动Kafka消费者时收到异常.

org.apache.kafka.clients.consumer.OffsetOutOfRangeException:偏移超出范围,没有为分区配置重置策略{test-0 = 29898318}

我正在使用Kafka版本9.0.0和Java 7.

java apache-kafka kafka-consumer-api

12
推荐指数
1
解决办法
3万
查看次数

如何从kafka服务器获取主题中的所有消息

我想从服务器的主题开始获取所有消息。

前任:

bin/kafka-console-consumer.sh --zookeeper 本地主机:2181 --topic testTopic --from-beginning

使用上述控制台命令时,我希望能够从一开始就获取主题中的所有消息,但我无法使用 java 代码从一开始就使用主题中的所有消息。

apache-kafka kafka-consumer-api

12
推荐指数
3
解决办法
3万
查看次数

Kafka bootstrap-servers与kafka-console-consumer中的zookeeper

我正在尝试使用3个代理和zookeeper来测试运行单个Kafka节点.我希望使用控制台工具进行测试.我这样运行生产者:

kafka-console-producer --broker-list localhost:9092,localhost:9093,localhost:9094 --topic testTopic
Run Code Online (Sandbox Code Playgroud)

然后我这样运行消费者:

kafka-console-consumer --zookeeper localhost:2181 --topic testTopic --from-beginning
Run Code Online (Sandbox Code Playgroud)

我可以按照预期在生产者中输入消息并在消费者中查看消息.但是,当我使用bootstrap-server运行使用者的更新版本时,我什么也得不到.例如

kafka-console-consumer --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic testTopic --from-beginning
Run Code Online (Sandbox Code Playgroud)

当我在端口9092上运行一个代理时,这很好用,所以我很困惑.有没有办法可以看到zookeeper提供什么作为引导程序服务器?引导服务器与代理列表不同吗?Kafka使用Scala 2.11编译.

apache-kafka kafka-consumer-api

12
推荐指数
1
解决办法
3万
查看次数

Kafka控制台消费者错误"分区上的偏移提交失败"

我正在使用a kafka-console-consumer探测kafka主题.

间歇性地,我收到此错误消息,然后是2个警告:

[2018-05-01 18:14:38,888] ERROR [Consumer clientId=consumer-1, groupId=console-consumer-56648] Offset commit failed on partition my-topic-0 at offset 444: The coordinator is not aware of this member. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

[2018-05-01 18:14:38,888] WARN [Consumer clientId=consumer-1, groupId=console-consumer-56648] Asynchronous auto-commit of offsets {my-topic-0=OffsetAndMetadata{offset=444, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-consumer-api

12
推荐指数
1
解决办法
1万
查看次数