Jos*_*fey 4 ssl apache-kafka apache-kafka-connect
在我们的 docker-swarm 中运行 kafka connect,并使用以下 compose 文件:
cp-kafka-connect-node:
image: confluentinc/cp-kafka-connect:5.1.0
ports:
- 28085:28085
secrets:
- kafka.truststore.jks
- source: kafka-connect-aws-credentials
target: /root/.aws/credentials
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka01:9093,kafka02:9093,kafka03:9093
CONNECT_LOG4J_ROOT_LEVEL: TRACE
CONNECT_REST_PORT: 28085
CONNECT_GROUP_ID: cp-kafka-connect
CONNECT_CONFIG_STORAGE_TOPIC: dev_cp-kafka-connect-config
CONNECT_OFFSET_STORAGE_TOPIC: dev_cp-kafka-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: dev_cp-kafka-connect-status
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: 'false'
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: 'false'
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_REST_ADVERTISED_HOST_NAME: localhost
CONNECT_PLUGIN_PATH: /usr/share/java/
CONNECT_SECURITY_PROTOCOL: SSL
CONNECT_SSL_TRUSTSTORE_LOCATION: /run/secrets/kafka.truststore.jks
CONNECT_SSL_TRUSTSTORE_PASSWORD: ********
KAFKA_HEAP_OPTS: '-XX:+UnlockExperimentalVMOptions -XX:+UseCGroupMemoryLimitForHeap -XX:MaxRAMFraction=2'
deploy:
replicas: 1
resources:
limits:
cpus: '0.50'
memory: 4gb
restart_policy:
condition: on-failure
delay: 10s
max_attempts: 3
window: 2000s
secrets:
kafka.truststore.jks:
external: true
kafka-connect-aws-credentials:
external: true
Run Code Online (Sandbox Code Playgroud)
kafka 连接节点成功启动,我可以设置任务并查看这些任务的状态...
我设置的连接器称为 kafka-sink,我使用以下配置创建它:
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"s3.region": "eu-central-1",
"flush.size": "1",
"schema.compatibility": "NONE",
"tasks.max": "1",
"topics": "input-topic-name",
"s3.part.size": "5242880",
"timezone": "UTC",
"directory.delim": "/",
"locale": "UK",
"s3.compression.type": "gzip",
"format.class": "io.confluent.connect.s3.format.bytearray.ByteArrayFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"name": "kafka-sink",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.bucket.name": "my-s3-bucket",
"rotate.schedule.interval.ms": "60000"
}
Run Code Online (Sandbox Code Playgroud)
该任务现在表示它正在运行。
当我没有包含 SSL 配置时,具体来说:
CONNECT_BOOTSTRAP_SERVERS: kafka01:9093,kafka02:9093,kafka03:9093
CONNECT_SECURITY_PROTOCOL: SSL
CONNECT_SSL_TRUSTSTORE_LOCATION: /run/secrets/kafka.truststore.jks
CONNECT_SSL_TRUSTSTORE_PASSWORD: ********
Run Code Online (Sandbox Code Playgroud)
而是指向一个没有安全性的暴露的引导服务器:
CONNECT_BOOTSTRAP_SERVERS: insecurekafka:9092
Run Code Online (Sandbox Code Playgroud)
它工作正常,从适当的输入主题读取,并使用默认分区输出到 S3 存储桶...
但是,当我使用 SSL 配置针对我的安全 kafka 主题运行它时,它不会记录任何错误,不会引发异常,但尽管数据不断被推送到输入主题,但它什么也不做...
难道我做错了什么?
这是我第一次使用 Kafka Connect,通常,我使用 Spring Boot 应用程序连接到 kafka,您只需在配置中指定信任库位置和密码即可。
我的撰写文件或任务配置中是否缺少某些配置?
我认为您需要为消费者和生产者添加 SSL 配置。检查这里Kafka Connect Encrypt with SSL 类似这样
security.protocol=SSL
ssl.truststore.location=~/kafka.truststore.jks
ssl.truststore.password=<password>
ssl.keystore.location=~/kafka.client.keystore.jks
ssl.keystore.password=<password>
ssl.key.password=<password>
producer.security.protocol=SSL
producer.ssl.truststore.location=~/kafka.truststore.jks
producer.ssl.truststore.password=<password>
producer.ssl.keystore.location=~/kafka.client.keystore.jks
producer.ssl.keystore.password=<password>
producer.ssl.key.password=<password>
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
5550 次 |
| 最近记录: |