小编Sun*_*pta的帖子

kafka使用新的消费者api升级到.9

我们正在将我们的kafka实现升级到.9并使用新的消费者java api来创建消费者.我正在使用以下代码用于消费者,我们正在使用设置主题到消费者,如在线A和线B是调用我们的服务处理我们收到的消息.现在问题是如果我们的消息处理花费超过30秒,我们就会得到Exception.

    Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("group.id", "test-group");
            props.put("auto.offset.reset", "earliest");
            props.put("heartbeat.interval.ms", "1000");
            props.put("receive.buffer.bytes", 10485760);
            props.put("fetch.message.max.bytes", 5242880);
            props.put("enable.auto.commit", false);
    //with partition assigned to consumer


            KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(props);
           // TopicPartition partition0 = new TopicPartition("TEST-TOPIC", 0);
            //consumer.assign(Arrays.asList(partition0));
            //assign topic to consumer without partition
//LINE A
            consumer.subscribe(Arrays.asList("TEST-TOPIC"), new ConsumerRebalanceListenerImp());
            List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
            while (true) {
                try {
                    ConsumerRecords<Object, Object> records = consumer.poll(1000);
                    consumeFromQueue(records);//LINE B
                    consumer.commitSync();

                } catch (CommitFailedException e) {
                    e.printStackTrace();
                    System.out.println("CommitFailedException");
                } catch (Exception …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-consumer-api

6
推荐指数
1
解决办法
4397
查看次数

卡夫卡消费群体正在重新平衡

我正在使用Kafka .9和新的java消费者.我在一个循环中进行轮询.当代码尝试执行consumer.commitSycn时,由于组重新平衡,我得到了commitfailedexcption.请注意,我将session.timeout.ms添加为30000,heartbeat.interval.ms为10000添加到消费者,并且轮询确实在30000中发生.任何人都可以帮助我.如果需要任何信息,请告诉我.

这是代码: -

       Properties props = new Properties();
        props.put("bootstrap.servers", {allthreeservers});
        props.put("group.id", groupId);
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", ObjectSerializer.class.getName());
        props.put("auto.offset.reset", erlierst);
        props.put("enable.auto.commit", false);
        props.put("session.timeout.ms", 30000);
        props.put("heartbeat.interval.ms", 10000);
        props.put("request.timeout.ms", 31000);
        props.put("kafka.consumer.topic.name", topic);
        props.put("max.partition.fetch.bytes", 1000);   

        while (true) {
            Boolean isPassed = true;
            try {
                ConsumerRecords<Object, Object> records = consumer.poll(1000);
                if (records.count() > 0) {
                        ConsumeEventInThread consumerEventInThread = new ConsumeEventInThread(records, consumerService);
                        FutureTask<Boolean> futureTask = new FutureTask<>(consumerEventInThread);
                        executorServiceForAsyncKafkaEventProcessing.execute(futureTask);
                        try {
                            isPassed = (Boolean) futureTask.get(Long.parseLong(props.getProperty("session.timeout.ms")) - Long.parseLong("5000"), TimeUnit.MILLISECONDS);
                        } catch (Exception Exception) {
                            logger.warn("Time out after waiting …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-consumer-api

5
推荐指数
1
解决办法
9273
查看次数

kafka 分区重新平衡(分配)花费太多时间

我们正在使用带有 3 个服务器的 kafka 集群,所有这些服务器也都有 zookeeper。我们有 7 个主题,每个主题有 6 个分区。每个主题我们有 3 个 Java 消费者。当我启动消费者时,将分区分配给消费者几乎需要 3-5 分钟。当我们停止其中一个消费者并再次启动时,会遇到相同的行为。我怎样才能控制或减少它?

请注意,我正在使用 kafka 0.9,新消费者

我在server.properties每个 kafka 中添加了以下属性

auto.leader.rebalance.enable=true
leader.imbalance.check.interval.seconds=10
Run Code Online (Sandbox Code Playgroud)

如果您需要更多信息,请与我们联系。谢谢

apache-kafka kafka-consumer-api kafka-producer-api

3
推荐指数
1
解决办法
2117
查看次数