这是我在 Stackoverflow 上的第一篇文章,希望我没有选错部分。
语境 :
Kafka HEAP 大小在以下文件中配置:
/etc/systemd/system/kafka.service
Run Code Online (Sandbox Code Playgroud)
使用以下参数:
Environment="KAFKA_HEAP_OPTS=-Xms6g -Xmx6g"
Run Code Online (Sandbox Code Playgroud)
操作系统是“CentOS Linux 7.7.1908 版”。
Kafka 是"confluent-kafka-2.12-5.3.1-1.noarch",从以下存储库安装:
# Confluent REPO
[Confluent.dist]
name=Confluent repository (dist)
baseurl=http://packages.confluent.io/rpm/5.3/7
gpgcheck=1
gpgkey=http://packages.confluent.io/rpm/5.3/archive.key
enabled=1
[Confluent]
name=Confluent repository
baseurl=http://packages.confluent.io/rpm/5.3
gpgcheck=1
gpgkey=http://packages.confluent.io/rpm/5.3/archive.key
enabled=1
Run Code Online (Sandbox Code Playgroud)
几天前,我在 3 台机器 KAFKA 集群上激活了 SSL,突然,以下命令停止工作:
kafka-topics --bootstrap-server <the.fqdn.of.server>:9093 --describe --topic <TOPIC-NAME>
Run Code Online (Sandbox Code Playgroud)
其中返回以下错误:
[2019-10-03 11:38:52,790] ERROR Uncaught exception in thread 'kafka-admin-client-thread | adminclient-1':(org.apache.kafka.common.utils.KafkaThread)
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
at …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用命令行工具通过 SSL 使用 0.11.0.3 kafka 版本连接以删除作为消费者的 kafka 代理,连接字符串如下
kafka-console-consumer.bat \
--bootstrap-server host:port \
--topic topicName \
--from-beginning \
--group groupId \
--consumer.config ssl.properties
Run Code Online (Sandbox Code Playgroud)
ssl.properties文件
security.protocol=SSL
ssl.truststore.location=path/to/truststore.jks
ssl.truststore.password=1234567
ssl.keystore.location=path/to/keystore.jks
ssl.keystore.password=1234567
ssl.key.password=1234567
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.truststore.type=JKS
ssl.keystore.type=JKS
Run Code Online (Sandbox Code Playgroud)
我得到的异常
[2020-04-28 17:24:39,522] ERROR [Consumer clientId=consumer-1, groupId=binomix] Connection to node -1 (/178.208.149.84:9301) failed authentication due to: SSL handshake failed (org.apache.kafka.clients.NetworkClient)
[2020-04-28 17:24:39,524] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1582)
at sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:544)
at sun.security.ssl.SSLEngineImpl.writeAppRecord(SSLEngineImpl.java:1216) …Run Code Online (Sandbox Code Playgroud) 当我从kakfa_2.12-2.3.0中的程序包运行zookeeper时,出现以下错误
$ export KAFKA_OPTS="-Djava.security.auth.login.config=/kafka/kafka_2.12-2.3.0/config/zookeeper_jaas.conf"
$ ./bin/zookeeper-server-start.sh config/zookeeper.properties
Run Code Online (Sandbox Code Playgroud)
而zookeeper_jaas.conf是
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret";
};
Run Code Online (Sandbox Code Playgroud)
而zookeeper.properties文件是
server=localhost:9092
#server=localhost:2888:3888
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="ibm" password="ibm-secret";
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
ssl.truststore.location=**strong text**/kafka/apache-zookeeper-3.5.5-bin/zookeeperkeys/client.truststore.jks
ssl.truststore.password=test1234
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
jaasLoginRenew=3600000
requireClientAuthScheme=sasl
Run Code Online (Sandbox Code Playgroud)
谁能建议可能是什么原因
我正在尝试配置 kafka 客户端以针对安全的 kafka 服务器进行身份验证。我已经设置了 jaas 和 ssl 配置,但它在抱怨 serviceNames。
我没有使用 Kerberos。
命令
KAFKA_OPTS="-Djava.security.auth.login.config=./jaas.conf" \
kafka-console-producer --broker-list k0:9092,k1:9092,k2:9092 \
--topic test-topic
--producer.config ./ssl.properties
Run Code Online (Sandbox Code Playgroud)
错误
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>
[ ... ]
Caused by: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config
Run Code Online (Sandbox Code Playgroud)
配置文件
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
serviceName="kafka"
password="broker-secret"
user_broker="broker-secret"
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
confluent.metrics.reporter.sasl.mechanism=PLAIN
user_username1="password1";
};
Run Code Online (Sandbox Code Playgroud)
ssl.properties
bootstrap.servers=k0:9092,k1:9092,k2:9092
security.protocol=SASL_PLAINTEXT
ssl.truststore.location=/var/ssl/private/client.truststore.jks
ssl.truststore.password=confluent
ssl.keystore.location=/var/ssl/private/client.keystore.jks
ssl.keystore.password=confluent
ssl.key.password=confluent
producer.bootstrap.servers=k0:9092,1:9092,k2:9092
producer.security.protocol=SASL_PLAINTEXT
producer.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
producer.ssl.truststore.location=/var/ssl/private/client.truststore.jks
producer.ssl.truststore.password=confluent
producer.ssl.keystore.location=/var/ssl/private/client.keystore.jks
producer.ssl.keystore.password=confluent
producer.ssl.key.password=confluent
org.apache.kafka.common.security.plain.PlainLoginModule required
password="broker-secret"
user_broker="broker-secret" …Run Code Online (Sandbox Code Playgroud) 我使用 librdkafka 作为客户端使用者,并且我已将代理和客户端配置为支持 SSL,用于代理:
listeners = PLAINTEXT://172.20.54.9:9092,SSL://172.20.54.9:9093
ssl.keystore.location=E:/project_files/Project/kafka_2.11-2.1.0/config/kafka.server.keystore.jks
ssl.keystore.password=ismail
ssl.key.password=ismail
ssl.truststore.location=E:/project_files/Project/kafka_2.11-2.1.0/config/kafka.server.truststore.jks
ssl.truststore.password=password
Run Code Online (Sandbox Code Playgroud)
在客户端我使用这个配置:
rd_kafka_conf_set(conf, "metadata.broker.list", "172.20.54.9:9093",
NULL, 0);
rd_kafka_conf_set(conf, "security.protocol", "ssl",
NULL, 0);
rd_kafka_conf_set(conf, "ssl.ca.location", "/usr/bin/NetSens/CARoot.pem",
NULL, 0);
rd_kafka_conf_set(conf, "ssl.certificate.location", "/usr/bin/NetSens/certificate.pem",
NULL, 0);
rd_kafka_conf_set(conf, "ssl.key.location", "/usr/bin/NetSens/key.pem",
NULL, 0);
rd_kafka_conf_set(conf, "ssl.key.password", "password",
NULL, 0);
Run Code Online (Sandbox Code Playgroud)
我收到此错误:
1559309856.897 RDKAFKA-3-ERROR: rdkafka#consumer-1: [thrd:ssl://172.20.54.9:9093/bootstrap]: ssl://172.20.54.9:9093/bootstrap: SSL handshake failed: ../ssl/record/ssl3_record.c:252: error:1408F10B:SSL routines:ssl3_get_record:wrong version number: (after 7ms in state CONNECT)
Run Code Online (Sandbox Code Playgroud)
有关更多信息,我还有另一个使用相同证书和密钥的 python kafka 客户端,它工作正常。我将不胜感激任何帮助。
当我们连接到 Kafka 集群/kafka 时,在 java 客户端中我们定义某些属性 -
示例Producer 属性
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Run Code Online (Sandbox Code Playgroud)
消费者属性示例-
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Run Code Online (Sandbox Code Playgroud)
同样,当连接到使用 SSL-Auth 保护的 Kafka 集群时,是否应在此处提及与 SSL-Auth 相关的任何属性(如密钥库和信任库路径等)。
有人可以详细说明并解释 Java 客户端如何连接到安全的 Kafka 集群吗?
ssl-certificate apache-kafka kafka-consumer-api kafka-producer-api apache-kafka-security
我正在运行 kafka 2.13-2.4.1 并配置用 java 编写的 kafka 客户端(消费者)和 kafka 集群(3 个节点,每个节点有一个代理)之间的 SSL 连接。我通过Confluence 的文档使用了官方文档,该文档具有单向身份验证(客户端没有证书),但它不起作用,所以我不得不使用两种身份验证,然后消费者和生产者控制台都通过 SSL 进行良好的通信,但是当我使用我的java消费者应用程序时:
package kafkaconsumerssl;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
public class KafkaConsumerSSLTest {
public static void main(String[] args) throws KafkaException {
Properties props = new Properties();
props.put("security.protocol", "SSL");
props.put("ssl.endpoint.identification.algorithm=", "");
props.put("ssl.truststore.location","/var/private/ssl/kafka.client.truststore.jks");
props.put("ssl.truststore.password","*******");
props.put("ssl.keystore.location", "/var/private/ssl/kafka.client.keystore.jks");
props.put("ssl.keystore.password", "********");
props.put("ssl.key.password", "*******");
props.put("acks", "all");
props.put("retries", "0");
props.setProperty("zk.connnect", "172.31.32.219:2181,172.31.41.226:2181,172.31.33.133:2181");
props.setProperty("group.id", "ConsumersTest");
props.setProperty("auto.offset.reset","earliest");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); …Run Code Online (Sandbox Code Playgroud) java ssl apache-kafka kafka-consumer-api apache-kafka-security
我们使用的 sasl 机制是SCRAM-SHA-256但是 kafka 生产者只会接受sasl_mechanismas PLAIN, GSSAPI,OAUTHBEARER
下面的配置会报错
sasl_mechanism must be in PLAIN, GSSAPI, OAUTHBEARER
Run Code Online (Sandbox Code Playgroud)
配置
ssl_produce = KafkaProducer(bootstrap_servers='brokerCName:9093',
security_protocol='SASL_SSL',
ssl_cafile='pemfilename.pem',
sasl_mechanism='SCRAM-SHA-256',
sasl_plain_username='password',
sasl_plain_password='secret')
Run Code Online (Sandbox Code Playgroud)
我需要知道如何指定正确的 sasl 机制。
谢谢
python ssl apache-kafka kafka-producer-api apache-kafka-security