Spring Cloud Stream Kafka 多重绑定

jav*_*eek 6 spring-boot spring-cloud spring-cloud-stream spring-kafka spring-cloud-stream-binder-kafka

我正在使用 Spring Cloud Stream Kafka 绑定器来消费来自 Kafka 的消息。我能够使我的示例与单个 Kafka Binder 一起工作,如下所示

spring:
  cloud:
    stream:
      kafka:
        binder:
          consumer-properties: {enable.auto.commit: true}
          auto-create-topics: false
          brokers: <broker url>
      bindings:
        consumer:
          destination: some-topic
          group: testconsumergroup
          consumer:
            concurrency: 1
            valueSerde: JsonSerde
        producer:
          destination: some-other-topic
          producer:
            valueSerde: JsonSerde
Run Code Online (Sandbox Code Playgroud)

请注意,此处两个绑定都指向同一个 Kafka Broker。但是,我遇到了一种情况,我需要发布到某个 Kafka 集群中的一个主题,并且还需要从不同 Kafka 集群中的另一个主题进行消费。我应该如何更改配置才能绑定到不同的 Kafka 集群?

我尝试过这样的事情

spring:
  cloud:
    stream:
      binders:
        defaultbinder:
          type: kafka
          environment:
            spring.cloud.stream.kafka.streams.binder.brokers: <cluster1-brokers>
        kafka1:
          type: kafka
          environment:
            spring.cloud.stream.kafka.streams.binder.brokers: <cluster2-brokers>
      bindings:
        consumer:
          binder: kafka1
          destination: some-topic
          group: testconsumergroup
          consumer:
            concurrency: 1
            valueSerde: JsonSerde
        producer:
          binder: defaultbinder
          destination: some-topic
          producer:
            valueSerde: JsonSerde      
      kafka:
        binder:
          consumer-properties: {enable.auto.commit: true}
          auto-create-topics: false
          brokers: <cluster1-brokers>
Run Code Online (Sandbox Code Playgroud)

spring:
  cloud:
    stream:
      binders:
        defaultbinder:
          type: kafka
          environment:
            spring.cloud.stream.kafka.streams.binder.brokers: <cluster1-brokers>
        kafka1:
          type: kafka
          environment:
            spring.cloud.stream.kafka.streams.binder.brokers: <cluster2-brokers>
      kafka:
        bindings:
          consumer:
            binder: kafka1
            destination: some-topic
            group: testconsumergroup
            consumer:
              concurrency: 1
              valueSerde: JsonSerde
          producer:
            binder: defaultbinder
            destination: some-topic
            producer:
              valueSerde: JsonSerde      
      kafka:
        binder:
          consumer-properties: {enable.auto.commit: true}
          auto-create-topics: false
          brokers: <cluster1-brokers>
Run Code Online (Sandbox Code Playgroud)

但两者似乎都不起作用。第一个配置似乎无效。对于第二个配置,我收到以下错误

Caused by: java.lang.IllegalStateException: A default binder has been requested, but there is more than one binder available for 'org.springframework.cloud.stream.messaging.DirectWithAttributesChannel' : kafka1,defaultbinder, and no default binder has been set.
Run Code Online (Sandbox Code Playgroud)

我正在使用依赖项 'org.springframework.cloud:spring-cloud-starter-stream-kafka:3.0.1.RELEASE' 和 Spring Boot 2.2.6

请让我知道如何使用 Spring Cloud Stream 为 Kafka 配置多个绑定

更新

尝试了下面这个配置

spring:
  cloud:
    stream:
      binders:
        kafka2:
          type: kafka
          environment:
            spring.cloud.stream.kafka.binder.brokers: <cluster2-brokers>
        kafka1:
          type: kafka
          environment:
            spring.cloud.stream.kafka.binder.brokers: <cluster1-brokers>
      bindings:
        consumer:
          destination: <some-topic>
          binder: kafka1
          group: testconsumergroup
          content-type: application/json
          nativeEncoding: true
          consumer:
            concurrency: 1
            valueSerde: JsonSerde
        producer:
          destination: some-topic
          binder: kafka2
          contentType: application/json
          nativeEncoding: true
          producer:
            valueSerde: JsonSerde
Run Code Online (Sandbox Code Playgroud)

消息流和EventHubBinding如下

public interface MessageStreams {
  String PRODUCER = "producer";
  String CONSUMER = "consumer;

  @Output(PRODUCER)
  MessageChannel producerChannel();

  @Input(CONSUMER)
  SubscribableChannel consumerChannel()
}

@EnableBinding(MessageStreams.class)
public class EventHubStreamsConfiguration {
}
Run Code Online (Sandbox Code Playgroud)

我的 Producer 类如下所示

@Component
@Slf4j
public class EventPublisher {
  private final MessageStreams messageStreams;

  public EventPublisher(MessageStreams messageStreams) {
    this.messageStreams = messageStreams;
  }

  public boolean publish(CustomMessage event) {
    MessageChannel messageChannel = getChannel();
    MessageBuilder messageBuilder = MessageBuilder.withPayload(event);
    boolean messageSent = messageChannel.send(messageBuilder.build());
    return messageSent;
  }

  protected MessageChannel getChannel() {
    return messageStreams.producerChannel();
  }
}
Run Code Online (Sandbox Code Playgroud)

Consumer 类如下所示

@Component
@Slf4j
public class EventHandler {
  private final MessageStreams messageStreams;

  public EventHandler(MessageStreams messageStreams) {
    this.messageStreams = messageStreams;
  }

  @StreamListener(MessageStreams.CONSUMER)
  public void handleEvent(Message<CustomMessage> message) throws Exception 
  {
    // process the event
  }

  @Override
  @ServiceActivator(inputChannel = "some-topic.testconsumergroup.errors")
  protected void handleError(ErrorMessage errorMessage) throws Exception {
    // handle error;
  }
}
Run Code Online (Sandbox Code Playgroud)

我在尝试发布和使用测试中的消息时收到以下错误。

Dispatcher has no subscribers for channel 'application.producer'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[104], headers={contentType=application/json, timestamp=1593517340422}]
Run Code Online (Sandbox Code Playgroud)

我错过了什么吗?对于单个集群,我能够发布和使用消息。该问题仅发生在多个集群绑定时