Sun*_*pta 6 apache-kafka kafka-consumer-api
我们正在将我们的kafka实现升级到.9并使用新的消费者java api来创建消费者.我正在使用以下代码用于消费者,我们正在使用设置主题到消费者,如在线A和线B是调用我们的服务处理我们收到的消息.现在问题是如果我们的消息处理花费超过30秒,我们就会得到Exception.
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("auto.offset.reset", "earliest");
props.put("heartbeat.interval.ms", "1000");
props.put("receive.buffer.bytes", 10485760);
props.put("fetch.message.max.bytes", 5242880);
props.put("enable.auto.commit", false);
//with partition assigned to consumer
KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(props);
// TopicPartition partition0 = new TopicPartition("TEST-TOPIC", 0);
//consumer.assign(Arrays.asList(partition0));
//assign topic to consumer without partition
//LINE A
consumer.subscribe(Arrays.asList("TEST-TOPIC"), new ConsumerRebalanceListenerImp());
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
try {
ConsumerRecords<Object, Object> records = consumer.poll(1000);
consumeFromQueue(records);//LINE B
consumer.commitSync();
} catch (CommitFailedException e) {
e.printStackTrace();
System.out.println("CommitFailedException");
} catch (Exception e) {
e.printStackTrace();
System.out.println("Exception in while consuming messages");
}
Run Code Online (Sandbox Code Playgroud)
例外是
2016-03-03 10:47:35.095 INFO 6448 --- [ask-scheduler-3] oakccinternals.AbstractCoordinator:标记协调员2147483647死了.2016年3月3日10:47:35.096 ERROR 6448 --- [问调度-3] oakccinternals.ConsumerCoordinator:同时提交用于组TEST-GROUP CommitFailedException org.apache.kafka.clients.consumer.CommitFailedException偏移时发生错误ILLEGAL_GENERATION:提交不能完成,由于在org.apache.kafka.clients.consumer.internals.ConsumerCoordinator $ OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:552)在org.apache.kafka.clients.consumer.internals.ConsumerCoordinator $ OffsetCommitResponseHandler组重新平衡.在org.apache.kafka.clients.consumer.internals.AbstractCoordinator $ CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:665)在org.apache.kafka.clients.consumer.internals.AbstractCoordinator $ CoordinatorResponseHandler处理(ConsumerCoordinator.java:493).的onSuccess(AbstractCoordinator.java:644)在org.apache.kafka.clients.consumer.internals.RequestFuture $ 1.onSuccess(RequestFuture.java:167)在org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuc 塞斯(RequestFuture.java:133)在org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)在org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient $ RequestFutureCompletionHandler.onComplete( ConsumerNetworkClient.java:380)在org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:274)在org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)在组织.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
提交偏移时出现上述异常.任何建议都会有所帮助,谢谢
发生这种情况是因为新的消费者是单线程的,并且它可以通过轮询或提交偏移量来保持心跳的唯一方法是,在30秒后,组协调员将您的消费者标记为已死并呼叫组重新平衡.对于这种情况,您可以增加request.timeout.ms或分割两个线程之间的消耗和处理工作.
| 归档时间: |
|
| 查看次数: |
4397 次 |
| 最近记录: |