AWS MSK - 在 ACL 打开的情况下创建 Kafka 主题时超时

Ara*_*raf 8 amazon-web-services apache-kafka apache-zookeeper kafka-topic aws-msk

我正在使用 AWS MSK 并且我想启用 ACL,但是当 ACL 开启时我无法创建主题。我正在使用命令行工具进行所有操作。这是我正在做的事情的总结:

  • 创建一个新集群
  • 创建一个主题 - 这很好用
  • 在 resource=CLUSTER 和 operation=ALL 上为 client1 打开 ACL
  • 使用 AdminClient 创建主题(通过提供 --bootstrap-server 选项) - 这会产生超时异常
  • 重新尝试创建相同的主题 - 这会给出一个错误,说明主题已经存在
  • 使用 AdminClient 列出主题 - 这不返回主题
  • 使用 Zookeeper 连接创建主题 - 这有效
  • 使用 Zookeeper connect 列出主题 - 这将返回我创建的所有主题(即使是那些超时的主题)

所以问题是该主题是在 Zookeeper 上创建的,但代理无法访问它。大概是由于我遗漏了一些 ACL 规则。

我运行的命令的原始输出:

ubuntu@ip-172-31-27-70:~/kafka_2.12-2.2.1/bin$ ./kafka-topics.sh --bootstrap-server $B --command-config ~/client1.properties \
--create --topic test3 --partitions 1 --replication-factor 1

Error while executing topic command : org.apache.kafka.common.errors.TimeoutException: Aborted due to timeout.
[2019-09-30 17:16:19,389] ERROR java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Aborted du
e to timeout.
        at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
        at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
        at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
        at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
        at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:175)
        at kafka.admin.TopicCommand$TopicService.createTopic(TopicCommand.scala:134)
        at kafka.admin.TopicCommand$TopicService.createTopic$(TopicCommand.scala:129)
        at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:157)
        at kafka.admin.TopicCommand$.main(TopicCommand.scala:60)
        at kafka.admin.TopicCommand.main(TopicCommand.scala)
Caused by: org.apache.kafka.common.errors.TimeoutException: Aborted due to timeout.
 (kafka.admin.TopicCommand$)
Run Code Online (Sandbox Code Playgroud)

再次运行相同的命令:

ubuntu@ip-172-31-27-70:~/kafka_2.12-2.2.1/bin$ ./kafka-topics.sh --bootstrap-server $B --command-config ~/client1.properties \
--create --topic test3 --partitions 1 --replication-factor 1

Error while executing topic command : org.apache.kafka.common.errors.TopicExistsException: Topic 'test3' already exists.
[2019-09-30 17:25:38,266] ERROR java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicExistsException: Topic
'test3' already exists.
        at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
        at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
        at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
        at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
        at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:175)
        at kafka.admin.TopicCommand$TopicService.createTopic(TopicCommand.scala:134)
        at kafka.admin.TopicCommand$TopicService.createTopic$(TopicCommand.scala:129)
        at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:157)
        at kafka.admin.TopicCommand$.main(TopicCommand.scala:60)
        at kafka.admin.TopicCommand.main(TopicCommand.scala)
Caused by: org.apache.kafka.common.errors.TopicExistsException: Topic 'test3' already exists.
 (kafka.admin.TopicCommand$)
Run Code Online (Sandbox Code Playgroud)

通过 AdminClient 的主题列表:

ubuntu@ip-172-31-27-70:~/kafka_2.12-2.2.1/bin$ ./kafka-topics.sh --bootstrap-server $B --command-config ~/client1.properties --list


Run Code Online (Sandbox Code Playgroud)

通过 Zookeeper 连接的主题列表:

ubuntu@ip-172-31-27-70:~/kafka_2.12-2.2.1/bin$ ./kafka-topics.sh --zookeeper $ZK --command-config ~/client1.properties --list
test
test2
test3
test4
test5
Run Code Online (Sandbox Code Playgroud)

这是我的 ACL 规则:

Current ACLs for resource `ResourcePattern(resourceType=CLUSTER, name=kafka-cluster, patternType=LITERAL)`:
        (principal=User:CN=client1.com, host=*, operation=ALL, permissionType=ALLOW)

Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=--operation=All, patternType=LITERAL)`:
        (principal=User:CN=client1.com, host=*, operation=ALL, permissionType=ALLOW)
Run Code Online (Sandbox Code Playgroud)

我错过了什么?

Dud*_*001 4

我认为这与 AWS MSK 没有任何关系,而是您的 Kafka 安全集群配置的问题。客户端(订阅者/生产者)和经纪人间的操作都需要在安全集群中进行授权。在非托管 Kafka 集群中也会遇到同样的问题。

建议在服务器上设置一个“超级用户”用户(我将其称为服务帐户),然后为这些“超级用户”用户提供 ACL,以允许集群所需的代理间交互。您需要的确切 ACL 将根据您的用例和安全偏好而有所不同。

In server.properties you'd add an entry like super.users=User:BrokerService, and is documented at https://docs.confluent.io/current/kafka/authorization.html#kafka-auth-superuser. The documentation suggests using Alice and Bob as superuser names, which seems confusing to me. Pick whatever user name makes sense for you.

Then you need to setup a similar ACL that uses a user name principal with the "superuser" user you created above e.g. principal=User:BrokerService. The ACL would give whatever permissions the brokers need. Your immediate use case is to ALLOW READ of all topics it sounds like. You'll probably need other ACLs for inter-broker communication as well, but I can't tell you what you need exactly without more information about what you want to do.

For example this command to setup the ACL.

kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add \
--allow-principal User:BrokerService --operation All --topic '*' --cluster
Run Code Online (Sandbox Code Playgroud)

More options for setting up the ACLs and a description of your exact problem are documented here https://docs.confluent.io/current/kafka/authorization.html#acl-format

Again please research some more or edit your question if you are looking for an exact configuration to use here as there are security and use case implications on what ACLs you use.

  • 是否有选项可以为 SASL/SCRAM 身份验证方法执行类似的操作? (2认同)