Sun*_*pta 5 apache-kafka kafka-consumer-api
我正在使用Kafka .9和新的java消费者.我在一个循环中进行轮询.当代码尝试执行consumer.commitSycn时,由于组重新平衡,我得到了commitfailedexcption.请注意,我将session.timeout.ms添加为30000,heartbeat.interval.ms为10000添加到消费者,并且轮询确实在30000中发生.任何人都可以帮助我.如果需要任何信息,请告诉我.
这是代码: -
Properties props = new Properties();
props.put("bootstrap.servers", {allthreeservers});
props.put("group.id", groupId);
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", ObjectSerializer.class.getName());
props.put("auto.offset.reset", erlierst);
props.put("enable.auto.commit", false);
props.put("session.timeout.ms", 30000);
props.put("heartbeat.interval.ms", 10000);
props.put("request.timeout.ms", 31000);
props.put("kafka.consumer.topic.name", topic);
props.put("max.partition.fetch.bytes", 1000);
while (true) {
Boolean isPassed = true;
try {
ConsumerRecords<Object, Object> records = consumer.poll(1000);
if (records.count() > 0) {
ConsumeEventInThread consumerEventInThread = new ConsumeEventInThread(records, consumerService);
FutureTask<Boolean> futureTask = new FutureTask<>(consumerEventInThread);
executorServiceForAsyncKafkaEventProcessing.execute(futureTask);
try {
isPassed = (Boolean) futureTask.get(Long.parseLong(props.getProperty("session.timeout.ms")) - Long.parseLong("5000"), TimeUnit.MILLISECONDS);
} catch (Exception Exception) {
logger.warn("Time out after waiting till session time out");
}
consumer.commitSync();
logger.info("Successfully committed offset for topic " + Arrays.asList(props.getProperty("kafka.consumer.topic.name")));
}else{
logger.info("Failed to process consumed messages, will not Commit and consume again");
}
}
} catch (Exception e) {
logger.error("Unhandled exception in while consuming messages " + Arrays.asList(props.getProperty("kafka.consumer.topic.name")), e);
}
}
Run Code Online (Sandbox Code Playgroud)
由于组已被重新平衡,因此无法完成提交时抛出CommitFailedException.这是使用Java客户端时必须注意的主要事项.由于所有网络IO(包括心跳)和消息处理都是在前台完成的,因此会话超时可能会在处理一批消息时到期.要解决这个问题,您有两种选择.
首先,您可以调整session.timeout.ms设置以确保处理程序有足够的时间来完成处理消息.然后,您可以调整max.partition.fetch.bytes以限制单个批处理中返回的数据量,但您必须考虑订阅主题中有多少个分区.
第二个选项是在单独的线程中进行消息处理,但是您必须管理流控制以确保线程能够跟上.
您可以将session.timeout.ms设置得足够大,以便从重新平衡中提交失败很少.唯一的缺点是在发生硬故障时可以重新分配分区之前的延迟较长.
有关详细信息,请参阅doc
这是一个有效的例子.
----工人代码-----
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
public class Worker implements Callable<Boolean> {
ConsumerRecord record;
public Worker(ConsumerRecord record) {
this.record = record;
}
public Boolean call() {
Map<String, Object> data = new HashMap<>();
try {
data.put("partition", record.partition());
data.put("offset", record.offset());
data.put("value", record.value());
Thread.sleep(10000);
System.out.println("Processing Thread---" + Thread.currentThread().getName() + " data: " + data);
return Boolean.TRUE;
} catch (Exception e) {
e.printStackTrace();
return Boolean.FALSE;
}
}
}
Run Code Online (Sandbox Code Playgroud)
---------执行代码------------------
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.*;
import java.util.concurrent.*;
public class AsyncConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("enable.auto.commit", false);
props.put("session.timeout.ms", 30000);
props.put("heartbeat.interval.ms", 10000);
props.put("request.timeout.ms", 31000);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("Test1", "Test2"));
int poolSize=10;
ExecutorService es= Executors.newFixedThreadPool(poolSize);
CompletionService<Boolean> completionService=new ExecutorCompletionService<Boolean>(es);
try {
while (true) {
System.out.println("Polling................");
ConsumerRecords<String, String> records = consumer.poll(1000);
List<ConsumerRecord> recordList = new ArrayList();
for (ConsumerRecord<String, String> record : records) {
recordList.add(record);
if(recordList.size() ==poolSize){
int taskCount=poolSize;
//process it
recordList.forEach( recordTobeProcess -> completionService.submit(new Worker(recordTobeProcess)));
while(taskCount >0){
try {
Future<Boolean> futureResult = completionService.poll(1, TimeUnit.SECONDS);
if (futureResult != null) {
boolean result = futureResult.get().booleanValue();
taskCount = taskCount - 1;
}
}catch (Exception e) {
e.printStackTrace();
}
}
recordList.clear();
Map<TopicPartition,OffsetAndMetadata> commitOffset= Collections.singletonMap(new TopicPartition(record.topic(),record.partition()),
new OffsetAndMetadata(record.offset() + 1));
consumer.commitSync(commitOffset);
}
}
}
} finally {
consumer.close();
}
}
}
Run Code Online (Sandbox Code Playgroud)
您需要遵循以下规则:
1) You need to pass fixed number of record(for example 10) to ConsumeEventInThread.
2) Create more thread for processing instead of one thread and submit all task on completionservice.
3) poll all submitted task and verify.
4) then commit(should use parametric commitSync method instead of non parametric).
Run Code Online (Sandbox Code Playgroud)