与Kafka Spring Cloud Stream多个@EnableBinding

Joh*_*ler 1 apache-kafka spring-boot spring-cloud-stream apache-kafka-streams

我试图设置一个监听Kafka的Spring Boot应用程序。

我正在使用Kafka Streams Binder。

一个简单的 @EnableBinding

@EnableBinding(StreamExample.StreamProcessor.class)
public class StreamExample {

    @StreamListener(StreamProcessor.INPUT)
    @SendTo(StreamProcessor.OUTPUT)
    public KStream<String, String> process(KStream<String, String> input) {

        logger.info("Stream listening");

        return input
                .peek(((key, value) -> logger.info("key = {} value = {}", key, value)));
    }

    interface StreamProcessor {

        String INPUT = "input_1";
        String OUTPUT = "output_1";

        @Input(INPUT)
        KStream<String, String> input();

        @Output(OUTPUT)
        KStream<String, String> outputProcessed();
    }
}
Run Code Online (Sandbox Code Playgroud)

和在 application.yml

spring:
  cloud:
    stream:
      kafka:
        streams:
          binder:
            brokers: localhost:29092
      bindings:
         input_1:
           destination: mytopic1
           group: readgroup
         output_1:
           destination: mytopic2
         input_2:
           destination: mytopic3
           group: readgroup
         output_2:
           destination: mytopic4
  application:
    name: stream_s1000_app
Run Code Online (Sandbox Code Playgroud)

一切正常。

但是,如果我尝试添加具有其他绑定的第二个类,则会发生以下错误:

以下订阅的主题未分配给任何成员:[mytopic1]

第二个绑定的示例:

@EnableBinding(StreamExampleBindingTwo.StreamProcessor.class)
public class StreamExampleBindingTwo {

    @StreamListener(StreamProcessor.INPUT)
    @SendTo(StreamProcessor.OUTPUT)
    public KStream<String, String> process(KStream<String, String> input) {

        logger.info("Stream listening binding two");

        return input
                .peek(((key, value) -> logger.info("key = {} value = {}", key, value)));
    }

    interface StreamProcessor {

        String INPUT = "input_2";
        String OUTPUT = "output_2";

        @Input(INPUT)
        KStream<String, String> input();

        @Output(OUTPUT)
        KStream<String, String> outputProcessed();
    }
}
Run Code Online (Sandbox Code Playgroud)

我想念的是什么?我不能在同一应用程序中使用多个输入主题和多个输出吗?是否有与application.name相关的东西?

sob*_*cko 5

我刚尝试了一个应用程序,那行得通。当同一应用程序中有多个处理器时,需要确保每个处理器都有自己的应用程序ID。请参阅下文,我如何对中的两个输入有2个不同的应用程序ID application.yml

我看到两个处理器都已登录到控制台。此外,还看到了有关输出主题的消息。

@SpringBootApplication
@EnableBinding({So54522918Application.StreamProcessor1.class, So54522918Application.StreamProcessor2.class})
public class So54522918Application {

    public static void main(String[] args) {
        SpringApplication.run(So54522918Application.class, args);
    }

    @StreamListener(StreamProcessor1.INPUT)
    @SendTo(StreamProcessor1.OUTPUT)
    public KStream<String, String> process1(KStream<String, String> input) {

        System.out.println("Stream listening");

        return input
                .peek(((key, value) -> System.out.println("key = " + key +", value = " + value)));
    }

    @StreamListener(StreamProcessor2.INPUT)
    @SendTo(StreamProcessor2.OUTPUT)
    public KStream<String, String> process2(KStream<String, String> input) {

        System.out.println("Stream listening binding two");

        return input
                .peek(((key, value) -> System.out.println("key = " + key +", value = " + value)));
    }

    interface StreamProcessor1 {

        String INPUT = "input_1";
        String OUTPUT = "output_1";

        @Input(INPUT)
        KStream<String, String> input();

        @Output(OUTPUT)
        KStream<String, String> outputProcessed();
    }

    interface StreamProcessor2 {

        String INPUT = "input_2";
        String OUTPUT = "output_2";

        @Input(INPUT)
        KStream<String, String> input();

        @Output(OUTPUT)
        KStream<String, String> outputProcessed();
    }

}
Run Code Online (Sandbox Code Playgroud)

application.yml的相关部分

spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000
spring.cloud.stream.kafka.streams:
  binder.configuration:
    default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
    default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
  bindings.input_1.consumer.application-id: process-1
  bindings.input_2.consumer.application-id: process-2
spring.cloud.stream.bindings.input_1:
  destination: mytopic1
spring.cloud.stream.bindings.output_1:
  destination: mytopic2
spring.cloud.stream.bindings.input_2:
  destination: mytopic3
spring.cloud.stream.bindings.output_2:
  destination: mytopic4
Run Code Online (Sandbox Code Playgroud)

  • 请参阅本节中的“ applicationId”:http://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/2.1.0.RELEASE/single/spring-cloud-stream-binder- kafka.html#_kafka_streams_properties (2认同)
  • 你是正确的谢谢。我认为文档应该比这更清晰...阅读Spring Cloud Stream文档(https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_kafka_streams_properties)没提到...。非常感谢! (2认同)