Ada*_*edi 2 java apache-kafka kafka-consumer-api
我刚刚开始使用Kafka。我面对消费者的一个小问题。我已经用Java编写了一个使用者。
我收到此异常-IllegalStateException此使用者已经关闭。
我在以下行得到异常:
ConsumerRecords<String,String> consumerRecords = consumer.poll(1000);
Run Code Online (Sandbox Code Playgroud)
在我的使用者崩溃并出现异常之后,这种情况开始发生,当我再次尝试运行它时,它给了我这个异常。
这是完整的代码:
package StreamApplicationsTest;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.*;
public class StreamAppConsumer {
public static void main(String[] args){
int i = 0;
//List<String> topics = new ArrayList<>();
List<String> topics = Collections.singletonList("test_topic");
//topics.add("test_topic");
Properties consumerConfigurations = new Properties();
consumerConfigurations.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
consumerConfigurations.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerConfigurations.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
consumerConfigurations.put(ConsumerConfig.GROUP_ID_CONFIG,"TestId");
Consumer<String,String> consumer = new KafkaConsumer<>(consumerConfigurations);
consumer.subscribe(topics);
while(true){
ConsumerRecords<String,String> consumerRecords = consumer.poll(1000);
Iterator<ConsumerRecord<String,String>> iterator = consumerRecords.iterator();
while(iterator.hasNext()){
i++;
ConsumerRecord<String,String> consumerRecord = iterator.next();
String key = consumerRecord.key();
String value = consumerRecord.value();
if(key=="exit" || value=="exit")
break;
System.out.println("Key="+key+"\tValue="+value);
}
System.out.println("Messages processed = "+Integer.toString(i));
consumer.close();
}
}
}
Run Code Online (Sandbox Code Playgroud)
我只是被这个问题困扰,任何帮助都是有用的。
之所以发生这种情况,是因为您在无限循环结束时关闭了使用者,因此当它第二次轮询使用者已经关闭时。为了处理眼前的问题,我将整个while(true)循环包装在try-catch中,并在catch或finally块中处理使用者关闭。
但是,如果Kafka用户未正确处理不同的关闭信号,则可能会丢失数据。我建议在此处查看Confluent的示例,以了解如何关闭用户。在您的情况下,因为您正在主线程中运行,所以它看起来像这样……
public static void main(String[] args) {
int i = 0;
//List<String> topics = new ArrayList<>();
List<String> topics = Collections.singletonList("test_topic");
//topics.add("test_topic");
Properties consumerConfigurations = new Properties();
consumerConfigurations.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
consumerConfigurations.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerConfigurations.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerConfigurations.put(ConsumerConfig.GROUP_ID_CONFIG, "TestId");
Consumer<String, String> consumer = new KafkaConsumer<>(consumerConfigurations);
consumer.subscribe(topics);
Runtime.getRuntime().addShutdownHook(new Thread()
{
public void run() {
consumer.wakeup();
}
});
try {
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);
Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator();
while (iterator.hasNext()) {
i++;
ConsumerRecord<String, String> consumerRecord = iterator.next();
String key = consumerRecord.key();
String value = consumerRecord.value();
if (key == "exit" || value == "exit")
break;
System.out.println("Key=" + key + "\tValue=" + value);
}
System.out.println("Messages processed = " + Integer.toString(i));
}
} catch (WakeupExection e) {
// Do Nothing
} finally {
consumer.close();
}
}
}
Run Code Online (Sandbox Code Playgroud)
基本上运行consumer.wakeup()是使用者中唯一的线程安全方法,因此,它是唯一可以在Java的shutdown钩子内部运行的方法。由于消费者在调用唤醒时没有处于睡眠状态,因此它会跳出唤醒执行,从而正常关闭消费者。
| 归档时间: |
|
| 查看次数: |
3123 次 |
| 最近记录: |