我们正在将我们的kafka实现升级到.9并使用新的消费者java api来创建消费者.我正在使用以下代码用于消费者,我们正在使用设置主题到消费者,如在线A和线B是调用我们的服务处理我们收到的消息.现在问题是如果我们的消息处理花费超过30秒,我们就会得到Exception.
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("auto.offset.reset", "earliest");
props.put("heartbeat.interval.ms", "1000");
props.put("receive.buffer.bytes", 10485760);
props.put("fetch.message.max.bytes", 5242880);
props.put("enable.auto.commit", false);
//with partition assigned to consumer
KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(props);
// TopicPartition partition0 = new TopicPartition("TEST-TOPIC", 0);
//consumer.assign(Arrays.asList(partition0));
//assign topic to consumer without partition
//LINE A
consumer.subscribe(Arrays.asList("TEST-TOPIC"), new ConsumerRebalanceListenerImp());
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
try {
ConsumerRecords<Object, Object> records = consumer.poll(1000);
consumeFromQueue(records);//LINE B
consumer.commitSync();
} catch (CommitFailedException e) {
e.printStackTrace();
System.out.println("CommitFailedException");
} catch (Exception …Run Code Online (Sandbox Code Playgroud) 我正在使用Kafka .9和新的java消费者.我在一个循环中进行轮询.当代码尝试执行consumer.commitSycn时,由于组重新平衡,我得到了commitfailedexcption.请注意,我将session.timeout.ms添加为30000,heartbeat.interval.ms为10000添加到消费者,并且轮询确实在30000中发生.任何人都可以帮助我.如果需要任何信息,请告诉我.
这是代码: -
Properties props = new Properties();
props.put("bootstrap.servers", {allthreeservers});
props.put("group.id", groupId);
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", ObjectSerializer.class.getName());
props.put("auto.offset.reset", erlierst);
props.put("enable.auto.commit", false);
props.put("session.timeout.ms", 30000);
props.put("heartbeat.interval.ms", 10000);
props.put("request.timeout.ms", 31000);
props.put("kafka.consumer.topic.name", topic);
props.put("max.partition.fetch.bytes", 1000);
while (true) {
Boolean isPassed = true;
try {
ConsumerRecords<Object, Object> records = consumer.poll(1000);
if (records.count() > 0) {
ConsumeEventInThread consumerEventInThread = new ConsumeEventInThread(records, consumerService);
FutureTask<Boolean> futureTask = new FutureTask<>(consumerEventInThread);
executorServiceForAsyncKafkaEventProcessing.execute(futureTask);
try {
isPassed = (Boolean) futureTask.get(Long.parseLong(props.getProperty("session.timeout.ms")) - Long.parseLong("5000"), TimeUnit.MILLISECONDS);
} catch (Exception Exception) {
logger.warn("Time out after waiting …Run Code Online (Sandbox Code Playgroud) 我们正在使用带有 3 个服务器的 kafka 集群,所有这些服务器也都有 zookeeper。我们有 7 个主题,每个主题有 6 个分区。每个主题我们有 3 个 Java 消费者。当我启动消费者时,将分区分配给消费者几乎需要 3-5 分钟。当我们停止其中一个消费者并再次启动时,会遇到相同的行为。我怎样才能控制或减少它?
请注意,我正在使用 kafka 0.9,新消费者
我在server.properties每个 kafka 中添加了以下属性
auto.leader.rebalance.enable=true
leader.imbalance.check.interval.seconds=10
Run Code Online (Sandbox Code Playgroud)
如果您需要更多信息,请与我们联系。谢谢