无法在IDE中删除Kafka Stream Application的状态目录

Ben*_*han 5 java windows apache-kafka apache-kafka-streams

我正在开发一个简单的Kafka Stream应用程序,该应用程序从一个主题中提取消息,并在转换后将其放入另一个主题中。我正在使用Intelij进行开发。

当我调试/运行此应用程序时,如果我的IDE和Kafka服务器位于SAME机器中,则它可以完美运行

(即使用BOOTSTRAP_SERVERS_CONFIG = localhost:9092和SCHEMA_REGISTRY_URL_CONFIG = localhost:8081)

但是,当我尝试使用另一台机器进行开发时

(即BOOTSTRAP_SERVERS_CONFIG = XXX.XXX.XXX:9092和SCHEMA_REGISTRY_URL_CONFIG = XXX.XXX.XXX:8081,其中XXX.XXX.XXX是我的Kafka的IP地址),

调试过程在第一次运行就没有问题。但是,当我在重置偏移量后第二次运行时,收到以下错误:

ERROR stream-thread [main] Failed to delete the state directory. (org.apache.kafka.streams.processor.internals.StateDirectory:297) 
java.nio.file.DirectoryNotEmptyException: \tmp\kafka-streams\my_application_id\0_0
Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: java.nio.file.DirectoryNotEmptyException:
Run Code Online (Sandbox Code Playgroud)

如果我改变my_application_idmy_application_id2,并运行它,它再次工作在第一时间,但再次接收错误,如果我再次运行。

我在应用程序的最后一句话中有以下代码:

Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
Run Code Online (Sandbox Code Playgroud)

有什么建议如何解决这个问题?

更新:

我已经查看了在我的开发机(Windows平台)中创建的状态目录,并且如果我在第二次运行之前手动删除了这些目录,则没有发现错误。我试图以管理员身份运行IDE,因为我认为这可能与文件夹的权限有关。但是,这没有帮助。

全栈供参考:

INFO Kafka版本:1.1.0(org.apache.kafka.common.utils.AppInfoParser:109)INFO Kafka commitId:fdcf75ea326b8e07(org.apache.kafka.common.utils.AppInfoParser:110)INFO流线程[main]删除用户调用清理的任务0_0的状态目录0_0。(org.apache.kafka.streams.processor.internals.StateDirectory:281)与目标VM断开连接,地址:“ 127.0.0.1:16552”,传输:“ socket”线程“ main” org.apache.kafka中的异常。 stream.errors.StreamsException:java.nio.file.DirectoryNotEmptyException:C:\ workspace \ bennychan \ kafka-streams \ my_application_001 \ 0_0 at org.apache.kafka.streams.processor.internals.StateDirectory.clean(StateDirectory.java:231 ),位于com.macroviewhk.financialreport.simpleStream上的org.apache.kafka.streams.KafkaStreams.cleanUp(KafkaStreams.java:931)。634)错误流线程[主]无法删除状态目录。(org.apache.kafka.streams.processor.internals.StateDirectory:297)在org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:287)java.nio.file.DirectoryNotEmptyException:C: org.apache.kafka.streams.processor.internals.StateDirectory.clean(StateDirectory.java:228)上的\ workspace \ bennychan \ kafka-streams \ my_application_001 \ 0_0位于sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java: 266)...在org.apache.kafka.common的java.nio.file.Files.delete(Files.java:1126)处的sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103)处还有3个。 utils.Utils $ 1.postVisitDirectory(Utils.java:651)在org.apache.kafka.common.utils.Utils $ 1。

更新2:经过另一次详细检查后,下面的行引发IOException

Files.walkFileTree(file.toPath(), new SimpleFileVisitor<Path>() {
Run Code Online (Sandbox Code Playgroud)

该行位于kafka-clients-1.1.0.jar org.apache.kafka.common.utilsUtils.class

可能是Windows系统的问题(对不起,我不是有经验的JAVA程序员)。

小智 5

对于Google员工。

我目前正在使用此Scala代码来帮助Windows家伙处理状态存储的删除。

if (System.getProperty("os.name").toLowerCase.contains("windows")) {
  logger.info("WINDOWS OS MODE - Cleanup state store.")
  try {
    FileUtils.deleteDirectory(new File("/tmp/kafka-streams/" + config.getProperty("application.id")))
    FileUtils.forceMkdir(new File("/tmp/kafka-streams/" + config.getProperty("application.id")))
  } catch {
    case e: Exception => logger.error(e.toString)
  }
}
else {
  streams.cleanUp()
}
Run Code Online (Sandbox Code Playgroud)


Mat*_*Sax 0

我同意@ideano1,这似乎与https://issues.apache.org/jira/browse/KAFKA-6647有关——你可以尝试的是,在测试之间显式调用KafkaStreams#cleanUp()。目前尚不清楚 Window 操作系统出现问题的原因。Atm,所有测试都在 Linux 上进行。