我们计划将 AWS MSK 服务用于托管 Kafka 和架构注册表以及 Confluence 的 Kafka Connect 服务来运行我们的连接器(Elasticsearch Sink Connector)。我们计划在 EC2 中运行架构注册表和连接器。
根据 Confluence 团队的说法,如果我们对 Kafka 使用 MSK,他们将无法正式支持 Confluence Schema Registry 和 Kafka Connect。
那么,有谁可以分享一下他们的经验吗?就像 Anybuddy 在生产环境中组合使用 MSK 和 Confluence 服务一样吗?
使用这种组合有风险吗?
是否推荐使用这种组合?
如果我们遇到连接器方面的任何问题,Confluence 社区如何提供支持?
还有其他建议、意见或替代方案吗?
我们已经拥有 Confluence 企业平台许可证,但我们希望拥有托管 Kafka 服务,这就是我们选择 AWS MKS 的原因,因为根据我们的分析,它比 Confluence Cloud 非常经济高效?
请分享您的想法并提前致谢。
谢谢
apache-kafka confluent-schema-registry confluent-cloud aws-msk confluent-platform
我正在评估 AWS Kinesis 与托管服务 Kafka (MSK)。我们的要求是从本地系统(使用 C++ 开发的系统)向 AWS 发送一些消息 (JSON)。然后我们需要将上面的消息持久化到像 PostgreSQL 这样的关系数据库中,同时我们需要将上面的数据流式传输到托管在 AWS 中的其他一些微服务(java)中。
我有以下疑问:
i) 如何从我的本地系统访问(连接和发送消息)到 AWS Kinesis?是否有任何 C++ API 支持?(有 java 客户端 API,但我们的本地系统是用 C++ 编写的)
ii) 如何从我的本地系统访问(连接和发送消息)到 AWS MSK?
iii) 是否可以将 MSK 与其他 AWS 服务(例如 lambda、Redshift、EMR 等)集成?
iv) 要将数据保存到数据库中,我们可以使用 AWS lambda 吗?(AWS Kinesis 支持该功能,AWS MSK 怎么样)
v) 我们的消息速率是 50msg/秒,什么是具有成本效益的解决方案?
有谁知道 AWS MSK(Kafka 托管流)是否支持 KSQL?我在 Confluence 平台上看过很多有关 KSQL 的视频和文档,但没有看过 AWS MSK 的视频和文档。
请告诉我它是否支持,并告诉我是否有关于 AWS MSK 中的 KSQL 设置的任何文档。
谢谢,巴拉
我正在使用 AWS MSK 并且我想启用 ACL,但是当 ACL 开启时我无法创建主题。我正在使用命令行工具进行所有操作。这是我正在做的事情的总结:
所以问题是该主题是在 Zookeeper 上创建的,但代理无法访问它。大概是由于我遗漏了一些 ACL 规则。
我运行的命令的原始输出:
ubuntu@ip-172-31-27-70:~/kafka_2.12-2.2.1/bin$ ./kafka-topics.sh --bootstrap-server $B --command-config ~/client1.properties \
--create --topic test3 --partitions 1 --replication-factor 1
Error while executing topic command : org.apache.kafka.common.errors.TimeoutException: Aborted due to timeout.
[2019-09-30 17:16:19,389] ERROR java.util.concurrent.ExecutionException: …Run Code Online (Sandbox Code Playgroud) amazon-web-services apache-kafka apache-zookeeper kafka-topic aws-msk
我正在评估AWS Managed Service Kafka(MSK),并且我知道当前它处于预览模式,因此可能没有所有功能或适当的文档。我尝试设置msk集群,并正在验证msk是否可以满足公司的所有用例/需求,但是目前,它缺少文档和示例。
https://docs.aws.amazon.com/msk/latest/developerguide/what-is-msk.html
我有以下查询:
i)如何使用在本地系统上运行的Kafka客户端访问AWS MSK?
ii)MSK是否支持架构演变并且仅支持一次语义?
iii)MSK将提供某种方式来更新某些群集或调整配置吗?就像AWS胶水一样,它们在托管环境中为火花执行器和驱动程序内存提供参数更改。
iv)是否可以将MSK与其他AWS服务(例如Redshift,EMR等)集成?
v)我可以通过ksql将流式SQL与MSK一起使用吗?如何使用MSK设置KSQL?
vi)如何对流经MSK的数据进行实时预测分析?
vii)与来自Azure / confluent的其他基于云的kafka群集相比,MSK的可靠性如何?与香草kafka相比,任何性能基准如何?集群中可以启动的最大经纪人数量是多少?
我有带有 2 个代理的 AWS MSK Kafka 集群。从日志中我可以看到(在每个经纪人上)他们正在不断地重新平衡。每分钟我都可以在日志中看到:
Preparing to rebalance group amazon.msk.canary.group.broker-1 in state PreparingRebalance with old generation 350887 (__consumer_offsets-21) (reason: Adding new member consumer-amazon.msk.canary.group.broker-1-27058-8aad596f-b00d-428a-abaa-f3a28d714f89 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
Run Code Online (Sandbox Code Playgroud)
25 秒后:
Preparing to rebalance group amazon.msk.canary.group.broker-1 in state PreparingRebalance with old generation 350888 (__consumer_offsets-21) (reason: removing member consumer-amazon.msk.canary.group.broker-1-27058-8aad596f-b00d-428a-abaa-f3a28d714f89 on LeaveGroup) (kafka.coordinator.group.GroupCoordinator)
Run Code Online (Sandbox Code Playgroud)
为什么会出现这种情况?是什么原因造成的?而什么是amazon.msk.canary.group.broker-1消费群体呢?
我已经使用 aiokafka 编写了一个 python 脚本,用于从 AWS MSK 中的 Kafka 集群生成和使用,我从与我的集群位于同一 VPC 中的 EC2 实例运行该脚本,当我尝试将我的脚本连接到cluster 它拒绝接受连接:
剧本
from aiokafka import AIOKafkaConsumer
import asyncio
import os
import sys
async def consume():
bootstrap_server = os.environ.get('BOOTSTRAP_SERVER', 'localhost:9092')
topic = os.environ.get('TOPIC', 'demo')
group = os.environ.get('GROUP_ID', 'demo-group')
consumer = AIOKafkaConsumer(
topic, bootstrap_servers=bootstrap_server, group_id=group
)
await consumer.start()
try:
# Consume messages
async for msg in consumer:
print("consumed: ", msg.topic, msg.partition, msg.offset,
msg.key, msg.value, msg.timestamp)
finally:
# Will leave consumer group; perform autocommit if enabled.
await consumer.stop()
def …Run Code Online (Sandbox Code Playgroud) 使用kafka-python包构建 KafkaProducer 时出现此错误:
[ERROR] UnrecognizedBrokerVersion: UnrecognizedBrokerVersion
Traceback (most recent call last):
File "/var/lang/lib/python3.7/imp.py", line 234, in load_module
return load_source(name, filename, file)
File "/var/lang/lib/python3.7/imp.py", line 171, in load_source
module = _load(spec)
File "<frozen importlib._bootstrap>", line 696, in _load
File "<frozen importlib._bootstrap>", line 677, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 728, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/var/task/kafka/producer/kafka.py", line 381, in __init__
**self.config)
File "/var/task/kafka/client_async.py", line 240, in __init__
self.config['api_version'] = self.check_version(timeout=check_timeout)
File "/var/task/kafka/client_async.py", line 908, …Run Code Online (Sandbox Code Playgroud) 当生产者首次向主题发布消息时,我们的AWS MSK集群不会自动创建主题。这令人惊讶,因为根据https://kafka.apache.org/documentation/#brokerconfigs的默认行为应具有auto.create.topics.enable = true。
我试图在我的代理上设置此配置,但由于配置read-only用于动态更新,因此无法执行。
$ kafka-configs --bootstrap-server $KAFKA_BROKER --entity-type brokers --entity-default --alter --add-config auto.create.topics.enable=true
Error while executing config command with args '--bootstrap-server $KAFKA_BROKER --entity-type brokers --entity-default --alter --add-config auto.create.topics.enable=true'
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.InvalidRequestException: Invalid config value for resource ConfigResource(type=BROKER, name=''): Cannot update these configs dynamically: Set(auto.create.topics.enable)
Run Code Online (Sandbox Code Playgroud)
注意:我$KAFKA_BROKER在输出中替换了我们经纪人的IP
如何配置AWS MSK Kafka集群以启用主题的自动创建?
我正在尝试使用 AWS 托管 Kafka 实例 (MSK) 创建 Kafka 客户端应用程序(生产者和消费者)。此外,代理到代理的通信以及客户端到代理的通信通过集群中的 TLS 配置为安全。CA 是 AWS 私有 CA,因为这是 MSK 支持的唯一一种通过 TLS 进行客户端到代理身份验证的 CA。
问题上下文:AWS官方文档(https://docs.aws.amazon.com/msk/latest/developerguide/msk-authentication.html#msk-authentication-client)步骤更倾向于Java世界并处理客户端信任库和密钥库为 jks。但是.Net客户端不使用Java JKS容器格式(https://github.com/mhowlett/confluence-kafka-dotnet/tree/security/examples/Security)。
服务器验证客户端:这部分我能够解决。由于 jks 只是一个数据存储,因此在按照上述 aws 文档创建密钥库后,我运行了一些额外的 keytool 和 openssl 命令来显式提取客户端证书和密钥。我能够使用它成功地生成和使用消息。
但是,为了让客户端验证服务器,我需要将 ssl.ca.location 设置为 CA 根证书。从私有 CA(用作 MSK 实例的 CA)我已经下载了根 CA,默认情况下为 pem 格式(注意:这仅包含证书详细信息,不包含密钥详细信息)。使用以下命令将其转换为 .crt:
add pem to truststore : keytool -keystore kafka.client.truststore.jks -alias CARoot -importcert -file Certificate.pem
get cert from truststore : keytool -export -alias CARoot -keystore kafka.client.truststore.jks -rfc -file ca-root.crt
使用上面的 ca-root.crt 作为 ca …
aws-msk ×10
apache-kafka ×9
python ×2
.net ×1
aiokafka ×1
kafka-python ×1
kafka-topic ×1
ksqldb ×1