是否可以在 Apache Kafka Java 客户端中禁用 SSL 证书验证?

Dth*_*Dth 5 java ssl apache-kafka

如果我有一个自签名证书,作为一个好公民,我会将它导入我的密钥库并使用“ssl.truststore.location”和“ssl.truststore.type”配置 Kafka 客户端以便使用它。

如果期望证书主题的通用名称可能与提供它的主机地址不同,我可以使用“ssl.endpoint.identification.algorithm”关闭端点验证。

如果我想完全跳过 SSL 验证,而不仅仅是主机名,这样我就不再需要复制证书怎么办?类似于 curl 中的“-k”或“--insecure”设置。我可以使用 Kafka 的默认 Java 客户端吗?

mar*_*ec3 6

有一种方法可以实现它,但它并不那么简单。

这个想法是实现接口 org.apache.kafka.common.security.auth.SslEngineFactory 将忽略证书验证。当您将其用作客户端时,只需以类似于以下方式的方式实现 createClientSslEngine 方法就足够了:

import org.apache.kafka.common.security.auth.SslEngineFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.cert.X509Certificate;
import java.util.Map;
import java.util.Set;

public class InsecureSslEngineFactory implements SslEngineFactory {

    private final TrustManager INSECURE_TRUST_MANAGER = new X509TrustManager() {

        public X509Certificate[] getAcceptedIssuers() {
            return null;
        }

        public void checkClientTrusted(X509Certificate[] certs, String authType) {
            // empty
        }

        public void checkServerTrusted(X509Certificate[] certs, String authType) {
            // empty
        }
    };

    @Override
    public SSLEngine createClientSslEngine(String peerHost, int peerPort, String endpointIdentification) {
        TrustManager[] trustManagers = new TrustManager[]{ INSECURE_TRUST_MANAGER };
        try {
            SSLContext sslContext = SSLContext.getInstance("SSL");
            sslContext.init(null, trustManagers, new SecureRandom());
            SSLEngine sslEngine = sslContext.createSSLEngine(peerHost, peerPort);
            sslEngine.setUseClientMode(true);
            return sslEngine;
        } catch (NoSuchAlgorithmException | KeyManagementException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public SSLEngine createServerSslEngine(String peerHost, int peerPort) {
        return null;
    }

    @Override
    public boolean shouldBeRebuilt(Map<String, Object> nextConfigs) {
        return false;
    }

    @Override
    public Set<String> reconfigurableConfigs() {
        return null;
    }

    @Override
    public KeyStore keystore() {
        return null;
    }

    @Override
    public KeyStore truststore() {
        return null;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}
Run Code Online (Sandbox Code Playgroud)

完成此类后,您只需在 kafka(生产者或消费者)属性中将其配置为 SSL_ENGINE_FACTORY_CLASS:

props.put(SslConfigs.SSL_ENGINE_FACTORY_CLASS, InsecureSslEngineFactory.class);
Run Code Online (Sandbox Code Playgroud)

或者如果您不想使用该常量:

props.put("ssl.engine.factory.class", InsecureSslEngineFactory.class);
Run Code Online (Sandbox Code Playgroud)

确保您不在生产中使用此设置!