小编Don*_*ley的帖子

如何在默认情况下从 Kafka Spring Cloud Stream 消费并同时消费由 Confluent API 生成的 Kafka 消息?

我正在构建一个微服务组件,它将默认使用由其他 (SCS) 组件生成的 Spring Cloud Stream (SCS) Kafka 消息。

但是我还需要从使用融合 API 的其他组件中使用 Kafka 消息。

我有一个示例存储库,显示了我正在尝试做什么。

https://github.com/donalthurley/KafkaConsumeScsAndConfluent

这是下面带有 SCS 输入绑定和融合输入绑定的应用程序配置。

spring:
  application:
    name: kafka
  kafka:
    consumer:
      properties.schema.registry.url: http://192.168.99.100:8081
  cloud:
    stream:
      kafka:
        binder:
          brokers: PLAINTEXT://192.168.99.100:9092
#          configuration:
#            specific:
#              avro:
#                reader: true
#            key:
#              deserializer: org.apache.kafka.common.serialization.StringDeserializer
#            value:
#              deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer

      bindings:
        inputConfluent:
          contentType: application/*+avro
          destination: confluent-destination
          group: input-confluent-group
        inputScs:
          contentType: application/*+avro
          destination: scs-destination
          group: input-scs-group
Run Code Online (Sandbox Code Playgroud)

通过上述配置,我使用 SCS 默认配置创建了两个使用者。例如,类 org.apache.kafka.common.serialization.ByteArrayDeserializer 是两个输入绑定的值反序列化器。

如果我删除上述配置中的注释,我会从我的 Confluent 客户端发送配置的两个消费者。例如,类 io.confluent.kafka.serializers.KafkaAvroDeserializer 是两个输入绑定的值反序列化器。

我理解因为配置在 Kafka binder …

java spring apache-kafka spring-cloud-stream

5
推荐指数
1
解决办法
1386
查看次数

标签 统计

apache-kafka ×1

java ×1

spring ×1

spring-cloud-stream ×1