小编Gon*_*bio的帖子

Spring Cloud Stream与Kafka Streams Binder:如何为流处理器设置`trusted.packages`(这与消费者和生产者不同)

我有一个简单的流处理器(不是消费者/生产者),如下所示(Kotlin)

\n
@Bean\nfun processFoo():Function<KStream<FooName, FooAddress>, KStream<FooName, FooAddressPlus>> {\n    return Function { input-> input.map { key, value ->\n        println("\\nPAYLOAD KEY: ${key.name}\\n");\n        println("\\nPAYLOAD value: ${value.address}\\n");\n        val output = FooAddressPlus()\n        output.address = value.address\n        output.name = value.name\n        output.plus = "$value.name-$value.address"\n        KeyValue(key, output)\n    }}\n}\n
Run Code Online (Sandbox Code Playgroud)\n

这些类FooNameFooAddress和 与FooAddressPlus处理器位于同一包中。\nHere\xe2\x80\x99s 我的配置文件:

\n
spring.cloud.stream.kafka.binder:\n  brokers: localhost:9093\n\nspring.cloud.stream.function.definition: processFoo\n\nspring.cloud.stream.kafka.streams.binder.functions.processFoo.applicationId: foo-processor\nspring.cloud.stream.bindings.processFoo-in-0:\n  destination: foo.processor\nspring.cloud.stream.bindings.processFoo-out-0:\n  destination: foo.processor.out\n\nspring.cloud.stream.kafka.streams.binder:\n  deserializationExceptionHandler: logAndContinue\n  configuration:\n    default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde\n    default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde\n    commit.interval.ms: 1000\n
Run Code Online (Sandbox Code Playgroud)\n

运行处理器时出现此错误:

\n
The class '<here_comes_package>.FooAddress' is not in the …
Run Code Online (Sandbox Code Playgroud)

kotlin apache-kafka spring-cloud-stream

1
推荐指数
1
解决办法
1669
查看次数

标签 统计

apache-kafka ×1

kotlin ×1

spring-cloud-stream ×1