spring cloud Stream启动流kafka消费者SSL配置

Kir*_*ran 3 spring-cloud-stream

Spring Cloud Stream启动器kafka在连接消费者时不会加载配置。以下是我在调试模式下运行时在控制台中看到的配置:

security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
Run Code Online (Sandbox Code Playgroud)

我有 bootstrap yml 文件的以下配置部分

    spring:
     cloud:
      stream:
       bindings:
         <binding configuration>
       kafka:
        binder:
         autoCreateTopics: false
         brokers: <list of kafka brokers>
         defaultBrokerPort: <default port>
        configuration:
         security:
          protocol: SSL
         ssl:
          truststore:
           location: <path to cliend truststore jks>
           password: <password>
           type: JKS
          keystore:
           location: <path to cliend keystore jks>
           password: <password>
           type: JKS
          key:
           password: <password>
          enabled:
           protocols: TLSv1.2,TLSv1.1,TLSv1
Run Code Online (Sandbox Code Playgroud)

谁能告诉我我的配置是否正确?我能够使用 spring-kafka 生产者成功地将消息发布到该主题。在我考虑在 spring kafka 中编写消费者之前,我想确保我做得正确。

sob*_*cko 5

我认为您不能将安全性和协议(例如)放在您提供的示例 yaml 中的两个级别,因为 Kafka 正在寻找诸如 等属性。security.protocol因此ssl.truststore.location,当您创建 yaml 文件时,请提供所有与安全相关的 kafka层次结构中同一级别的属性。否则,spring 将它们作为键/值对。

spring:
     cloud:
      stream:
       bindings:
         <binding configuration>
       kafka:
        binder:
         autoCreateTopics: false
         brokers: <list of kafka brokers>
         defaultBrokerPort: <default port>
        configuration:
         security.protocol: SSL
         ssl.truststore.location: <path to cliend truststore jks>
         ssl.truststore.password: <password>
         ssl.truststore.type: JKS
...
Run Code Online (Sandbox Code Playgroud)