我的 Kafka Stream 应用程序具有以下配置
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG,this.applicaionId);
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,svrConfig.getBootstrapServers());
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// we disable the cache to demonstrate all the "steps" involved in the transformation - not recommended in prod
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, svrConfig.getCacheMaxBytesBufferingConfig());
// Exactly once processing!!
config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,SpecificAvroSerde.class);
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,SpecificAvroSerde.class);
config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,"http://localhost:8081");
Run Code Online (Sandbox Code Playgroud)
我收到以下错误:
Exception in thread "main" io.confluent.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value.
at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:243)
at io.confluent.common.config.AbstractConfig.<init>(AbstractConfig.java:78)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.<init>(AbstractKafkaAvroSerDeConfig.java:100)
at io.confluent.kafka.serializers.KafkaAvroSerializerConfig.<init>(KafkaAvroSerializerConfig.java:32)
at io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:48)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.configure(SpecificAvroSerializer.java:58)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde.configure(SpecificAvroSerde.java:107)
Run Code Online (Sandbox Code Playgroud)
我试图更换线路
config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,"http://localhost:8081");
Run Code Online (Sandbox Code Playgroud)
和
config.put("schema.registry.url","http://localhost:8081");
Run Code Online (Sandbox Code Playgroud)
但有同样的错误
在准备我的 Stream 应用程序时,我已按照 …
java avro apache-kafka apache-kafka-streams confluent-schema-registry
我正在开发一个简单的Kafka Stream应用程序,该应用程序从一个主题中提取消息,并在转换后将其放入另一个主题中。我正在使用Intelij进行开发。
当我调试/运行此应用程序时,如果我的IDE和Kafka服务器位于SAME机器中,则它可以完美运行
(即使用BOOTSTRAP_SERVERS_CONFIG = localhost:9092和SCHEMA_REGISTRY_URL_CONFIG = localhost:8081)
但是,当我尝试使用另一台机器进行开发时
(即BOOTSTRAP_SERVERS_CONFIG = XXX.XXX.XXX:9092和SCHEMA_REGISTRY_URL_CONFIG = XXX.XXX.XXX:8081,其中XXX.XXX.XXX是我的Kafka的IP地址),
调试过程在第一次运行就没有问题。但是,当我在重置偏移量后第二次运行时,收到以下错误:
ERROR stream-thread [main] Failed to delete the state directory. (org.apache.kafka.streams.processor.internals.StateDirectory:297)
java.nio.file.DirectoryNotEmptyException: \tmp\kafka-streams\my_application_id\0_0
Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: java.nio.file.DirectoryNotEmptyException:
Run Code Online (Sandbox Code Playgroud)
如果我改变my_application_id为my_application_id2,并运行它,它再次工作在第一时间,但再次接收错误,如果我再次运行。
我在应用程序的最后一句话中有以下代码:
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
Run Code Online (Sandbox Code Playgroud)
有什么建议如何解决这个问题?
更新:
我已经查看了在我的开发机(Windows平台)中创建的状态目录,并且如果我在第二次运行之前手动删除了这些目录,则没有发现错误。我试图以管理员身份运行IDE,因为我认为这可能与文件夹的权限有关。但是,这没有帮助。
全栈供参考:
INFO Kafka版本:1.1.0(org.apache.kafka.common.utils.AppInfoParser:109)INFO Kafka commitId:fdcf75ea326b8e07(org.apache.kafka.common.utils.AppInfoParser:110)INFO流线程[main]删除用户调用清理的任务0_0的状态目录0_0。(org.apache.kafka.streams.processor.internals.StateDirectory:281)与目标VM断开连接,地址:“ 127.0.0.1:16552”,传输:“ socket”线程“ main” org.apache.kafka中的异常。 stream.errors.StreamsException:java.nio.file.DirectoryNotEmptyException:C:\ workspace \ bennychan \ kafka-streams \ my_application_001 \ 0_0 at org.apache.kafka.streams.processor.internals.StateDirectory.clean(StateDirectory.java:231 ),位于com.macroviewhk.financialreport.simpleStream上的org.apache.kafka.streams.KafkaStreams.cleanUp(KafkaStreams.java:931)。634)错误流线程[主]无法删除状态目录。(org.apache.kafka.streams.processor.internals.StateDirectory:297)在org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:287)java.nio.file.DirectoryNotEmptyException:C: org.apache.kafka.streams.processor.internals.StateDirectory.clean(StateDirectory.java:228)上的\ workspace \ bennychan \ kafka-streams \ my_application_001 …