我有一个简单的流处理器(不是消费者/生产者),如下所示(Kotlin)
\n@Bean\nfun processFoo():Function<KStream<FooName, FooAddress>, KStream<FooName, FooAddressPlus>> {\n return Function { input-> input.map { key, value ->\n println("\\nPAYLOAD KEY: ${key.name}\\n");\n println("\\nPAYLOAD value: ${value.address}\\n");\n val output = FooAddressPlus()\n output.address = value.address\n output.name = value.name\n output.plus = "$value.name-$value.address"\n KeyValue(key, output)\n }}\n}\n
Run Code Online (Sandbox Code Playgroud)\n这些类FooName
、FooAddress
和 与FooAddressPlus
处理器位于同一包中。\nHere\xe2\x80\x99s 我的配置文件:
spring.cloud.stream.kafka.binder:\n brokers: localhost:9093\n\nspring.cloud.stream.function.definition: processFoo\n\nspring.cloud.stream.kafka.streams.binder.functions.processFoo.applicationId: foo-processor\nspring.cloud.stream.bindings.processFoo-in-0:\n destination: foo.processor\nspring.cloud.stream.bindings.processFoo-out-0:\n destination: foo.processor.out\n\nspring.cloud.stream.kafka.streams.binder:\n deserializationExceptionHandler: logAndContinue\n configuration:\n default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde\n default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde\n commit.interval.ms: 1000\n
Run Code Online (Sandbox Code Playgroud)\n运行处理器时出现此错误:
\nThe class '<here_comes_package>.FooAddress' is not in the …
Run Code Online (Sandbox Code Playgroud)