无法仅运行此简单流,...需要SERDE配置吗?

ear*_*ron 2 spring-boot spring-cloud-stream apache-kafka-streams

是的,我已经阅读了所有找到的文档,并尝试了所有其他配置方法,但是仅此简单的示例(该行应该记录一行)不起作用

(这是一个带有spring-cloud-stream-binder-kafka-streams的Spring-Boot-2应用程序)

Kafka正在存储一个字符串值(空键)

我的aplilication.yaml

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: 'myStreamTopic'
    output:
          producer.keySerde: 'org.apache.kafka.common.serialization.Serdes$StringSerde'
      kafka:
        streams:
          binder:
            configuration:
              default.key.serde: 'org.apache.kafka.common.serialization.Serdes$StringSerde'
              default.value.serde: 'org.apache.kafka.common.serialization.Serdes$StringSerde'
            brokers:
              - 'ommited:9092'
              - 'ommited:9092'
              - 'ommited:9092'
            application-id: hack1
Run Code Online (Sandbox Code Playgroud)

只需将以下简单代码作为POC:

@SpringBootApplication
@Slf4j
public class HackatonApplication {

    public static void main(String[] args) {
SpringApplication.run(HackatonApplication.class, args);
}

  @EnableBinding(KafkaStreamsProcessor.class)
  public static class LineProcessor {

    @StreamListener(Sink.INPUT)
    public void process(KStream<?, String> line) {
      log.info("Received: {}", line);
    }

  }
Run Code Online (Sandbox Code Playgroud)

我无法使其运行!

org.springframework.context.ApplicationContextException:无法启动bean'outputBindingLifecycle';嵌套异常是java.lang.IllegalArgumentException:尝试调用公共抽象org.apache.kafka.streams.kstream.KStream org.apache.kafka.streams.kstream.KStream.map(org.apache.kafka.streams.kstream.KeyValueMapper ),但尚未设置任何代表。在org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:184)〜[spring-context-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]在org.springframework.context.support处。 DefaultLifecycleProcessor.access $ 200(DefaultLifecycleProcessor.java:52)〜[spring-context-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]在

抱歉,如果这很琐碎,但我已经花了几个小时进行搜索,谷歌搜索并试图找到有据可查的解决方案。

sob*_*cko 5

You are using the out of the box KafkaStreamsProcessor for binding which expects a single KStream as input and another KStream as output. If you are using this standard one, you must provide proper configuration for output binding (such as destination etc.). Then your method must return a KStream and you need to use the SendTo annotation on the spring side to bind. Something like below:

@StreamListener(Sink.INPUT)
@SendTo("output")
public KStream<?,String> process(KStream<?, String> line) {
    log.info("Received: {}", line);
    return line;
}
Run Code Online (Sandbox Code Playgroud)

但是,您可以使用自定义处理器,并将其用于EnableBinding

interface CustomKafkaStreamsProcessor {
        @Input("input")
        KStream<?, ?> input();
    } 
Run Code Online (Sandbox Code Playgroud)

然后将其与绑定一起使用 @EnableBinding(CustomKafkaStreamsProcessor.class)

这样,您不必更改返回值的方法。