在设置Kafka Broker集群并创建几个主题后,我们发现Kafka自动创建了以下两个主题:
这些主题的重要性和用途是什么?
我在Spark Structured Streaming中使用Kafka Source来接收Confluent编码的Avro记录.我打算使用Confluent Schema Registry,但是与spark结构化流媒体的集成似乎是不可能的.
我已经看到了这个问题,但无法使用Confluent Schema Registry.使用Spark 2.0.2读取来自Kafka的Avro消息(结构化流式传输)
avro apache-kafka apache-spark confluent-schema-registry spark-structured-streaming
我正在尝试使用Confluent kafka-avro-console-consumer,但是如何将Schema Registry的参数传递给它?
我正在评估 kinesis 作为 kafka 的替代品。我缺少的一件事是模式注册表等效解决方案。我特别需要:
处理上述 2 的选项是什么?我发现的唯一一件事是胶水目录,但似乎并没有
最后我也想使用 firehose(输出到 redshift),但据我所知,这是不可能的,需要编写自定义 lambda。
我们计划将 AWS MSK 服务用于托管 Kafka 和架构注册表以及 Confluence 的 Kafka Connect 服务来运行我们的连接器(Elasticsearch Sink Connector)。我们计划在 EC2 中运行架构注册表和连接器。
根据 Confluence 团队的说法,如果我们对 Kafka 使用 MSK,他们将无法正式支持 Confluence Schema Registry 和 Kafka Connect。
那么,有谁可以分享一下他们的经验吗?就像 Anybuddy 在生产环境中组合使用 MSK 和 Confluence 服务一样吗?
使用这种组合有风险吗?
是否推荐使用这种组合?
如果我们遇到连接器方面的任何问题,Confluence 社区如何提供支持?
还有其他建议、意见或替代方案吗?
我们已经拥有 Confluence 企业平台许可证,但我们希望拥有托管 Kafka 服务,这就是我们选择 AWS MKS 的原因,因为根据我们的分析,它比 Confluence Cloud 非常经济高效?
请分享您的想法并提前致谢。
谢谢
apache-kafka confluent-schema-registry confluent-cloud aws-msk confluent-platform
当尝试将 record.value() 转换为 java 对象时,我在消费者中遇到了这个异常:
ClassCastException: class org.apache.avro.generic.GenericData$Record cannot be cast to class [...].PublicActivityRecord (org.apache.avro.generic.GenericData$Record and [...].PublicActivityRecord are in unnamed module of loader 'app')
Run Code Online (Sandbox Code Playgroud)
生产者发送 java 对象,它是一个名为 的用户定义类型,如下所示:PublicActivityRecord
ClassCastException: class org.apache.avro.generic.GenericData$Record cannot be cast to class [...].PublicActivityRecord (org.apache.avro.generic.GenericData$Record and [...].PublicActivityRecord are in unnamed module of loader 'app')
Run Code Online (Sandbox Code Playgroud)
此时我可以在调试模式下看到 的值ProducerRecord确实是类型PublicActivityRecord。
在注册表服务器上,我可以在日志中看到发送模式的生产者的 POST 请求:
Registering new schema: subject DEV-INF_9325_activityRecord_01-value, version null, id null, type null, schema size 7294 (io.confluent.kafka.schemaregistry.rest.resources.SubjectVersionsResource:262)
[2022-01-28 07:01:35,575] INFO 192.168.36.30 - - [28/janv./2022:06:01:34 …Run Code Online (Sandbox Code Playgroud) java classcastexception avro apache-kafka confluent-schema-registry
我有一个 spring 应用程序,它是我的 kafka 制作人,我想知道为什么 avro 是最好的方法。我阅读了它以及它所提供的所有内容,但是为什么我不能将我用 jackson 创建的 POJO 序列化并将其发送到 kafka?
我这么说是因为 avro 的 POJO 生成不是那么简单。最重要的是,它需要 maven 插件和一个 .avsc 文件。
因此,例如我在我的 kafka 生产者上有一个 POJO,名为 User:
public class User {
private long userId;
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public long getUserId() {
return userId;
}
public void setUserId(long userId) {
this.userId = userId;
}
}
Run Code Online (Sandbox Code Playgroud)
我将它序列化并将其发送到我在 kafka 中的用户主题。然后我有一个消费者,它本身有一个 POJO 用户并反序列化消息。是空间问题吗?以这种方式序列化和反序列化不是也更快吗?更不用说维护模式注册表的开销了。
我正在使用 Kafka 和架构注册表。定义了一个模式,在生产者端使用 confluence 的 KafkaAvroSerializer。一切正常。
另一方面,如果生产者在不遵守架构的情况下发布事件,则发布事件不会出现任何问题。
据了解,Kafka 仅获取序列化的二进制文件,不会检查数据和功能是否按设计工作。
想知道是否有更好的方法来强制执行更强大的模式验证,以便主题不会被不良数据污染?
为了在我的 MSK 主题上支持架构注册表,我找到了两个选项 -
由于 Glue SR 完全由 AWS 管理,我更愿意使用它。但是,我的生产者和消费者客户端是用 python 编写的,这限制了我在 java 中使用 AWS 提供的 SerDe 库。
我继续搜索是否可以使融合模式注册表 API(python 中融合的 kafka 库的一部分)与 Glue 模式注册表一起使用,因为我最初的假设是模式注册表实现在融合和胶水之间是通用的。
在阅读融合文档时,我发现可以通过使用url属性(https://docs.confluent.io/platform/current/clients/confluent-kafka-python/#)为架构注册表提供 url 来建立架构注册表连接。
我无法为我创建的胶水注册表找到这样的 url。他们的 Java 客户端确实接受区域名称、注册表名称和其他属性等属性(https://docs.aws.amazon.com/glue/latest/dg/glue-dg.pdf#schema-registry)。有什么办法可以找到这个 url,或者我应该切换到 confluent SR,这不是托管服务?
amazon-web-services apache-kafka aws-glue confluent-schema-registry
我正在寻找安装融合模式注册表的选项,是否可以单独下载和安装注册表并使其与现有的 kafka 设置一起使用?
谢谢