是的,我已经阅读了所有找到的文档,并尝试了所有其他配置方法,但是仅此简单的示例(该行应该记录一行)不起作用
(这是一个带有spring-cloud-stream-binder-kafka-streams的Spring-Boot-2应用程序)
Kafka正在存储一个字符串值(空键)
我的aplilication.yaml
spring:
cloud:
stream:
bindings:
input:
destination: 'myStreamTopic'
output:
producer.keySerde: 'org.apache.kafka.common.serialization.Serdes$StringSerde'
kafka:
streams:
binder:
configuration:
default.key.serde: 'org.apache.kafka.common.serialization.Serdes$StringSerde'
default.value.serde: 'org.apache.kafka.common.serialization.Serdes$StringSerde'
brokers:
- 'ommited:9092'
- 'ommited:9092'
- 'ommited:9092'
application-id: hack1
Run Code Online (Sandbox Code Playgroud)
只需将以下简单代码作为POC:
@SpringBootApplication
@Slf4j
public class HackatonApplication {
public static void main(String[] args) {
SpringApplication.run(HackatonApplication.class, args);
}
@EnableBinding(KafkaStreamsProcessor.class)
public static class LineProcessor {
@StreamListener(Sink.INPUT)
public void process(KStream<?, String> line) {
log.info("Received: {}", line);
}
}
Run Code Online (Sandbox Code Playgroud)
我无法使其运行!
org.springframework.context.ApplicationContextException:无法启动bean'outputBindingLifecycle';嵌套异常是java.lang.IllegalArgumentException:尝试调用公共抽象org.apache.kafka.streams.kstream.KStream org.apache.kafka.streams.kstream.KStream.map(org.apache.kafka.streams.kstream.KeyValueMapper ),但尚未设置任何代表。在org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:184)〜[spring-context-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]在org.springframework.context.support处。 DefaultLifecycleProcessor.access $ 200(DefaultLifecycleProcessor.java:52)〜[spring-context-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]在
抱歉,如果这很琐碎,但我已经花了几个小时进行搜索,谷歌搜索并试图找到有据可查的解决方案。