小编Ben*_*han的帖子

Kafka Stream with Avro in JAVA , schema.registry.url" 没有默认值

我的 Kafka Stream 应用程序具有以下配置

    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG,this.applicaionId);
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,svrConfig.getBootstrapServers());
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    // we disable the cache to demonstrate all the "steps" involved in the transformation - not recommended in prod
    config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, svrConfig.getCacheMaxBytesBufferingConfig());

    // Exactly once processing!!
    config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,SpecificAvroSerde.class);
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,SpecificAvroSerde.class);
    config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,"http://localhost:8081");
Run Code Online (Sandbox Code Playgroud)

我收到以下错误:

Exception in thread "main" io.confluent.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value.
at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:243)
at io.confluent.common.config.AbstractConfig.<init>(AbstractConfig.java:78)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.<init>(AbstractKafkaAvroSerDeConfig.java:100)
at io.confluent.kafka.serializers.KafkaAvroSerializerConfig.<init>(KafkaAvroSerializerConfig.java:32)
at io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:48)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.configure(SpecificAvroSerializer.java:58)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde.configure(SpecificAvroSerde.java:107)
Run Code Online (Sandbox Code Playgroud)

我试图更换线路

config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,"http://localhost:8081");
Run Code Online (Sandbox Code Playgroud)

config.put("schema.registry.url","http://localhost:8081");
Run Code Online (Sandbox Code Playgroud)

但有同样的错误

在准备我的 Stream 应用程序时,我已按照 …

java avro apache-kafka apache-kafka-streams confluent-schema-registry

5
推荐指数
1
解决办法
1万
查看次数

无法在IDE中删除Kafka Stream Application的状态目录

我正在开发一个简单的Kafka Stream应用程序,该应用程序从一个主题中提取消息,并在转换后将其放入另一个主题中。我正在使用Intelij进行开发。

当我调试/运行此应用程序时,如果我的IDE和Kafka服务器位于SAME机器中,则它可以完美运行

(即使用BOOTSTRAP_SERVERS_CONFIG = localhost:9092和SCHEMA_REGISTRY_URL_CONFIG = localhost:8081)

但是,当我尝试使用另一台机器进行开发时

(即BOOTSTRAP_SERVERS_CONFIG = XXX.XXX.XXX:9092和SCHEMA_REGISTRY_URL_CONFIG = XXX.XXX.XXX:8081,其中XXX.XXX.XXX是我的Kafka的IP地址),

调试过程在第一次运行就没有问题。但是,当我在重置偏移量后第二次运行时,收到以下错误:

ERROR stream-thread [main] Failed to delete the state directory. (org.apache.kafka.streams.processor.internals.StateDirectory:297) 
java.nio.file.DirectoryNotEmptyException: \tmp\kafka-streams\my_application_id\0_0
Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: java.nio.file.DirectoryNotEmptyException:
Run Code Online (Sandbox Code Playgroud)

如果我改变my_application_idmy_application_id2,并运行它,它再次工作在第一时间,但再次接收错误,如果我再次运行。

我在应用程序的最后一句话中有以下代码:

Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
Run Code Online (Sandbox Code Playgroud)

有什么建议如何解决这个问题?

更新:

我已经查看了在我的开发机(Windows平台)中创建的状态目录,并且如果我在第二次运行之前手动删除了这些目录,则没有发现错误。我试图以管理员身份运行IDE,因为我认为这可能与文件夹的权限有关。但是,这没有帮助。

全栈供参考:

INFO Kafka版本:1.1.0(org.apache.kafka.common.utils.AppInfoParser:109)INFO Kafka commitId:fdcf75ea326b8e07(org.apache.kafka.common.utils.AppInfoParser:110)INFO流线程[main]删除用户调用清理的任务0_0的状态目录0_0。(org.apache.kafka.streams.processor.internals.StateDirectory:281)与目标VM断开连接,地址:“ 127.0.0.1:16552”,传输:“ socket”线程“ main” org.apache.kafka中的异常。 stream.errors.StreamsException:java.nio.file.DirectoryNotEmptyException:C:\ workspace \ bennychan \ kafka-streams \ my_application_001 \ 0_0 at org.apache.kafka.streams.processor.internals.StateDirectory.clean(StateDirectory.java:231 ),位于com.macroviewhk.financialreport.simpleStream上的org.apache.kafka.streams.KafkaStreams.cleanUp(KafkaStreams.java:931)。634)错误流线程[主]无法删除状态目录。(org.apache.kafka.streams.processor.internals.StateDirectory:297)在org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:287)java.nio.file.DirectoryNotEmptyException:C: org.apache.kafka.streams.processor.internals.StateDirectory.clean(StateDirectory.java:228)上的\ workspace \ bennychan \ kafka-streams \ my_application_001 …

java windows apache-kafka apache-kafka-streams

5
推荐指数
2
解决办法
4437
查看次数