为Kafka客户端启用SSL

Moh*_*bal 7 apache-kafka

Kafka允许客户端通过SSL连接.默认情况下,SSL已禁用,但我通过引用以下链接启用了该功能. http://docs.confluent.io/2.0.0/kafka/ssl.html

完成所有配置后,Producer/Consumer无法生成/使用该消息.

    [2016-02-29 09:20:49,189] ERROR Error when sending message to topic ssltopic with key: null, value: 2 bytes with error: Failed to update metadata after 60000 ms. (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
sas
[2016-02-29 09:21:16,031] WARN Failed to send SSL Close message  (org.apache.kafka.common.network.SslTransportLayer)
java.io.IOException: Connection reset by peer
Run Code Online (Sandbox Code Playgroud)

Sai*_*dam 7

supermonk的上述答案澄清了大多数要检查的地方.我遇到了与OP类似的问题,错误不在代理配置中,而是客户端配置.
在官方文档中,虽然他们暗中提到将client.keystore创建为步骤1,但我错过了使用CA签署证书,就像为server.keystore所做的那样.这导致Kafka经纪人拒绝来自客户(生产者/消费者)的连接.

执行这两个步骤已经消除了我的问题.

keytool -keystore kafka.client.keystore.jks -alias localhost -certreq -file cert-file
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days $VALIDITY -CAcreateserial -passin pass:$PASSWORD

keytool -keystore kafka.client.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.client.keystore.jks -alias localhost -import -file cert-signed
Run Code Online (Sandbox Code Playgroud)

这将使用CA证书对证书进行签名,并将CARoot和签名证书添加到client.keystore.

参考:关于保护Apache Kafka的汇总博客


小智 6

以下是一些验证步骤.由于错误日志不详细,您是否可以尝试以下步骤来验证设置是否良好.

  1. 要验证服务器的keystoretruststore被正确设置,你可以运行下面的命令:

    openssl s_client -debug -connect localhost:9093 -tls1注意:TLSv1应列在ssl.enabled.protocols下.

在此命令的输出中,您应该看到服务器的证书:

-----BEGIN CERTIFICATE-----
{variable sized random bytes} 
-----END CERTIFICATE----- 
subject=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=Joe Smith   issuer=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=kafka/emailAddress=test@test.com 
Run Code Online (Sandbox Code Playgroud)

如果证书未显示或者是否有任何其他错误消息,则说明您keystore的设置不正确.

  1. 检查server.properties

    echo "############################# Security  #############################" >>server.properties
    echo "listeners=SSL://:9093" >>server.properties
    echo "security.inter.broker.protocol=SSL" >> server.properties
    echo "ssl.client.auth=required" >> server.properties
    echo "ssl.keystore.location=/home/vagrant/securityDemo/kafka.server.keystore.jks" >> server.properties
    echo "ssl.keystore.password=test1234" >> server.properties
    echo "ssl.key.password=test1234" >> server.properties
    echo "ssl.truststore.location=/home/vagrant/securityDemo/kafka.server.truststore.jks" >> server.properties
    echo "ssl.truststore.password=test1234" >> server.properties
    echo "ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1" >> server.properties
    echo "authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer" >>, if acl enabled ** server.properties
    
    Run Code Online (Sandbox Code Playgroud)
  2. 确保只有一个CA根.

参考文献1.参考文献2.