使用UncaughtExceptionHandler重新启动或关闭流的正确方法

Sam*_*amy 7 apache-kafka-streams

我有一个带有以下驱动程序代码的流应用程序,用于实时消息转换.

String topicName = ...
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source = builder.stream(topicName);

source.transform(() -> new MyTransformer()).to(...);

KafkaStreams streams = new KafkaStreams(builder, appConfig);
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
    public void uncaughtException(Thread t, Throwable e) {
        logger.error("UncaughtExceptionHandler " + e.getMessage());
        System.exit(0);
    }
});


streams.cleanUp();
streams.start();

Runtime.getRuntime().addShutdownHook(new  Thread(streams::close));
Run Code Online (Sandbox Code Playgroud)

执行几分钟后,应用程序抛出以下异常,然后不进行流.

[2017-02-22 14:24:35,139] ERROR [StreamThread-14] User provided listener org.apache.kafka.streams.processor.internals.StreamThread$1 for group TRANSFORMATION-APP failed on partition assignment (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
org.apache.kafka.streams.errors.ProcessorStateException: task [0_11] Error while creating the state manager
    at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:72)
    at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:89)
    at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:633)
    at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:660)
    at org.apache.kafka.streams.processor.internals.StreamThread.access$100(StreamThread.java:69)
    at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:124)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
Caused by: java.io.IOException: task [0_11] Failed to lock the state directory: /tmp/kafka-streams/TRANSFORMATION-APP/0_11
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:101)
    at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:69)
    ... 13 more
Run Code Online (Sandbox Code Playgroud)

我试图清除/tmp/kafka-streams/TRANSFORMATION-APP目录并重新启动应用程序,但再次抛出相同的异常.我注意到的一件事是应用程序工作正常,直到它转换所有积压消息,但在处理一些新消息后抛出异常!

有时它也会引发下面未被捕获的异常.

[ERROR] 2017-02-22 12:40:54.804 [StreamThread-29] MyTransformer - UncaughtExceptionHandler task directory [/tmp/kafka-streams/TRANSFORMATION-APP/0_24] doesn't exist and couldn't be created

[ERROR] 2017-02-22 12:42:30.148 [StreamThread-179] MyTransformer - UncaughtExceptionHandler stream-thread [StreamThread-179] Failed 
to rebalance
Run Code Online (Sandbox Code Playgroud)

抛出(其中一个)这些异常后,app仍在运行,但没有在流中进行.

处理这些错误的正确方法是什么?是否可以以编程方式重新启动流,而不会杀死应用程序?这个应用程序是monit.在最坏的情况下,我宁愿正确终止应用程序(没有任何消息丢失),以便monit可以重新启动它.

输入主题有100个分区,我num.stream.threads在应用程序配置中设置为100.该应用程序已开启Kafka 0.10.1.1-cp1.

Mat*_*Sax 7

Kakfa 0.10.1.x在多线程方面存在一些缺陷.您可以升级到0.10.2(今天发布的AK,很快就会推出CP 3.2)或者您应用以下解决方法:

  • 仅使用单线程执行
  • 如果您需要更多线程,请启动更多实例
  • 对于每个实例,配置不同的状态目录

在重新启动之前,您可能还需要删除本地状态目录(仅一次)以进入整体一致的应用程序状态.

无论如何,不​​会有数据丢失.即使出现故障,Kafka Streams也能保证至少一次处理语义.这也适用于本地商店 - 在删除本地状态目录后,在启动时将从底层Kafka更改日志主题重新创建这些状态(虽然这是一项昂贵的操作).

UncaughtExceptionHandler确实只提供你一个方法,找出一个线程死亡.它(直接)没有帮助重新启动您的应用程序.要恢复死亡线程,您需要KafkaStreams完全关闭实例并创建/启动新实例.我们希望将来能够为此提供更好的支持.

  • 没关系。您不应该同时由两个线程调用 `streams::close` —— 这可能会死锁。否则,可以在异常处理程序中关闭。 (2认同)
  • @MatthiasJ.Sax 如果它不应该一次被多个线程调用,那么为什么不在“KafkaStreams”类中使其同步(只是问一下!) (2认同)