我正在运行 kafka 2.13-2.4.1 并配置用 java 编写的 kafka 客户端(消费者)和 kafka 集群(3 个节点,每个节点有一个代理)之间的 SSL 连接。我通过Confluence 的文档使用了官方文档,该文档具有单向身份验证(客户端没有证书),但它不起作用,所以我不得不使用两种身份验证,然后消费者和生产者控制台都通过 SSL 进行良好的通信,但是当我使用我的java消费者应用程序时:
package kafkaconsumerssl;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
public class KafkaConsumerSSLTest {
public static void main(String[] args) throws KafkaException {
Properties props = new Properties();
props.put("security.protocol", "SSL");
props.put("ssl.endpoint.identification.algorithm=", "");
props.put("ssl.truststore.location","/var/private/ssl/kafka.client.truststore.jks");
props.put("ssl.truststore.password","*******");
props.put("ssl.keystore.location", "/var/private/ssl/kafka.client.keystore.jks");
props.put("ssl.keystore.password", "********");
props.put("ssl.key.password", "*******");
props.put("acks", "all");
props.put("retries", "0");
props.setProperty("zk.connnect", "172.31.32.219:2181,172.31.41.226:2181,172.31.33.133:2181");
props.setProperty("group.id", "ConsumersTest");
props.setProperty("auto.offset.reset","earliest");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); …Run Code Online (Sandbox Code Playgroud) java ssl apache-kafka kafka-consumer-api apache-kafka-security