Ben*_*han 5 java windows apache-kafka apache-kafka-streams
我正在开发一个简单的Kafka Stream应用程序,该应用程序从一个主题中提取消息,并在转换后将其放入另一个主题中。我正在使用Intelij进行开发。
当我调试/运行此应用程序时,如果我的IDE和Kafka服务器位于SAME机器中,则它可以完美运行
(即使用BOOTSTRAP_SERVERS_CONFIG = localhost:9092和SCHEMA_REGISTRY_URL_CONFIG = localhost:8081)
但是,当我尝试使用另一台机器进行开发时
(即BOOTSTRAP_SERVERS_CONFIG = XXX.XXX.XXX:9092和SCHEMA_REGISTRY_URL_CONFIG = XXX.XXX.XXX:8081,其中XXX.XXX.XXX是我的Kafka的IP地址),
调试过程在第一次运行就没有问题。但是,当我在重置偏移量后第二次运行时,收到以下错误:
ERROR stream-thread [main] Failed to delete the state directory. (org.apache.kafka.streams.processor.internals.StateDirectory:297)
java.nio.file.DirectoryNotEmptyException: \tmp\kafka-streams\my_application_id\0_0
Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: java.nio.file.DirectoryNotEmptyException:
Run Code Online (Sandbox Code Playgroud)
如果我改变my_application_id
为my_application_id2
,并运行它,它再次工作在第一时间,但再次接收错误,如果我再次运行。
我在应用程序的最后一句话中有以下代码:
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
Run Code Online (Sandbox Code Playgroud)
有什么建议如何解决这个问题?
更新:
我已经查看了在我的开发机(Windows平台)中创建的状态目录,并且如果我在第二次运行之前手动删除了这些目录,则没有发现错误。我试图以管理员身份运行IDE,因为我认为这可能与文件夹的权限有关。但是,这没有帮助。
全栈供参考:
INFO Kafka版本:1.1.0(org.apache.kafka.common.utils.AppInfoParser:109)INFO Kafka commitId:fdcf75ea326b8e07(org.apache.kafka.common.utils.AppInfoParser:110)INFO流线程[main]删除用户调用清理的任务0_0的状态目录0_0。(org.apache.kafka.streams.processor.internals.StateDirectory:281)与目标VM断开连接,地址:“ 127.0.0.1:16552”,传输:“ socket”线程“ main” org.apache.kafka中的异常。 stream.errors.StreamsException:java.nio.file.DirectoryNotEmptyException:C:\ workspace \ bennychan \ kafka-streams \ my_application_001 \ 0_0 at org.apache.kafka.streams.processor.internals.StateDirectory.clean(StateDirectory.java:231 ),位于com.macroviewhk.financialreport.simpleStream上的org.apache.kafka.streams.KafkaStreams.cleanUp(KafkaStreams.java:931)。634)错误流线程[主]无法删除状态目录。(org.apache.kafka.streams.processor.internals.StateDirectory:297)在org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:287)java.nio.file.DirectoryNotEmptyException:C: org.apache.kafka.streams.processor.internals.StateDirectory.clean(StateDirectory.java:228)上的\ workspace \ bennychan \ kafka-streams \ my_application_001 \ 0_0位于sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java: 266)...在org.apache.kafka.common的java.nio.file.Files.delete(Files.java:1126)处的sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103)处还有3个。 utils.Utils $ 1.postVisitDirectory(Utils.java:651)在org.apache.kafka.common.utils.Utils $ 1。
更新2:经过另一次详细检查后,下面的行引发IOException
Files.walkFileTree(file.toPath(), new SimpleFileVisitor<Path>() {
Run Code Online (Sandbox Code Playgroud)
该行位于kafka-clients-1.1.0.jar org.apache.kafka.common.utilsUtils.class
可能是Windows系统的问题(对不起,我不是有经验的JAVA程序员)。
小智 5
对于Google员工。
我目前正在使用此Scala代码来帮助Windows家伙处理状态存储的删除。
if (System.getProperty("os.name").toLowerCase.contains("windows")) {
logger.info("WINDOWS OS MODE - Cleanup state store.")
try {
FileUtils.deleteDirectory(new File("/tmp/kafka-streams/" + config.getProperty("application.id")))
FileUtils.forceMkdir(new File("/tmp/kafka-streams/" + config.getProperty("application.id")))
} catch {
case e: Exception => logger.error(e.toString)
}
}
else {
streams.cleanUp()
}
Run Code Online (Sandbox Code Playgroud)
我同意@ideano1,这似乎与https://issues.apache.org/jira/browse/KAFKA-6647有关——你可以尝试的是,在测试之间显式调用KafkaStreams#cleanUp()
。目前尚不清楚 Window 操作系统出现问题的原因。Atm,所有测试都在 Linux 上进行。
归档时间: |
|
查看次数: |
4437 次 |
最近记录: |