用例:
我正在使用 Spring Boot2.2.5.RELEASE并且Kafka 2.4.1
JAAS/SASL 配置在 Kafka/ZooKeeper 上正确完成,因为创建的主题没有问题kafka-topics.bat
问题:
当我启动 Spring Boot 应用程序时,我立即收到以下错误:
kafka-server-start.bat 控制台:
INFO [SocketServer brokerId=1] Failed authentication with /127.0.0.1 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)
IDE控制台:
WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=xxx, groupId=yyy] Bootstrap broker localhost:9093 (id: -3 rack: null) disconnected
我的application.properties配置:
spring.kafka.jaas.enabled=true
spring.kafka.properties.security.protocol=SASL_PLAINTEXT
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="spring_bO0t" password="i_am_a_spring_bO0t_user";
Run Code Online (Sandbox Code Playgroud)
kafka_server_jaas.conf:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="12345"
user_admin="12345"
user_spring_bO0t="i_am_a_spring_bO0t_user";
};
Run Code Online (Sandbox Code Playgroud)
我错过了什么吗?
提前致谢。
jum*_*key 19
我在错误的地方定义了属性,即在application.properties. 由于我有 ProducerFactory 和 ConsumerFactory bean,这些application.properties将被 Spring Boot忽略。
在 beans 定义中配置相同的属性解决了该问题,即将属性从 定义 beans 的位置application.properties 移动。
这是一个例子:
@Bean
public ProducerFactory<Object, Object> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
"%s required username=\"%s\" " + "password=\"%s\";", PlainLoginModule.class.getName(), "username", "password"
));
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public ConsumerFactory<Object, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
"%s required username=\"%s\" " + "password=\"%s\";", PlainLoginModule.class.getName(), "username", "password"
));
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configs.put("security.protocol", "SASL_PLAINTEXT");
configs.put("sasl.mechanism", "PLAIN");
configs.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=username" +
"password=password;");
return new KafkaAdmin(configs);
}
Run Code Online (Sandbox Code Playgroud)
小智 5
@jumping_monkey 提供的答案是正确的,但是我不知道将这些配置放在ProducerFactory& ConsumerFactorybeans 中的什么位置,因此我将在下面留下示例以供想知道的人使用:
-分别在您的ProducerConfig或ConsumerConfigBeans 中(我的名为generalMessageProducerFactory):
@Bean
public ProducerFactory<String, GeneralMessageDto> generalMessageProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configProps.put("sasl.mechanism", "PLAIN");
configProps.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='YOUR_KAFKA_CLUSTER_USERNAME' password='YOUR_KAFKA_CLUSTER_PASSWORD';");
configProps.put("security.protocol", "SASL_SSL");
return new DefaultKafkaProducerFactory<>(configProps);
}
Run Code Online (Sandbox Code Playgroud)
还有在你的TopicConfigurationClass inkafkaAdmin方法中:
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configs.put("sasl.mechanism", "PLAIN");
configs.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='YOUR_KAFKA_CLUSTER_USERNAME' password='YOUR_KAFKA_CLUSTER_PASSWORD';");
configs.put("security.protocol", "SASL_SSL");
return new KafkaAdmin(configs);
}
Run Code Online (Sandbox Code Playgroud)
希望这是有帮助的家伙!
| 归档时间: |
|
| 查看次数: |
7138 次 |
| 最近记录: |