gst*_*low 4 java multithreading shutdown-hook apache-kafka kafka-consumer-api
我正在读这篇文章
这里是完成消费者线程的代码:
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
System.out.println("Starting exit...");
consumer.wakeup(); 1
try {
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Run Code Online (Sandbox Code Playgroud)
据我了解,ShutdownHook 在所有非守护线程完成但在进程被操作系统销毁之前调用。
1. 从我的角度来看 mainThread.join() 是没有用的。主线程总是会在 ShutdownHook 执行的那一刻结束。这是正确的还是我误解了什么?
2.其实我不明白为什么我们需要等待主线程?我们需要等待 close 方法执行吗?
书中提供了以下主要方法代码:
try {
// looping until ctrl-c, the shutdown hook will cleanup on exit
while (true) {
ConsumerRecords<String, String> records =
movingAvg.consumer.poll(1000);
System.out.println(System.currentTimeMillis() + "-- waiting for data...");
for (ConsumerRecord<String, String> record :
records) {
System.out.printf("offset = %d, key = %s,
value = %s\n",
record.offset(), record.key(),
record.value());
}
for (TopicPartition tp: consumer.assignment())
System.out.println("Committing offset at position:" + consumer.position(tp));
movingAvg.consumer.commitSync();
}
} catch (WakeupException e) {
// ignore for shutdown 2
} finally {
consumer.close(); 3
System.out.println("Closed consumer and we are done");
}
}
Run Code Online (Sandbox Code Playgroud)
您确实consumer.wakeup()
会中断当前消费者的操作(这可能是长时间运行的(例如轮询)甚至被阻止(在beginningOffsets(...)
.
放置mainThread.join()
在那里是为了确保主线程实际完成并且在唤醒后的处理过程中不会被关闭。请记住,shutdownHook 还负责处理中断,而不仅仅是普通的程序关闭。
因此,如果您使用 ctrl-C 中断:
1. shutdown hook gets called
2. main thread is still running, most often waiting for data in `poll`
3. shutdown hook `wakeup`-s the main thread
4. main thread enters the exception handler, breaks the loop
5. main thread closes the consumer with `.close()`
6. shutdown hook waits for 5. and finishes
Run Code Online (Sandbox Code Playgroud)
如果不等待,您可能还没有执行步骤 4 和 5 中的消费者关闭步骤。
归档时间: |
|
查看次数: |
6026 次 |
最近记录: |