我已经使用以下命令启动了 Zookeeper 和 Kafka:
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
Run Code Online (Sandbox Code Playgroud)
当我尝试获取架构注册表兼容性设置(向后、向前、无)时,我运行了以下curl命令:
curl -X GET http://localhost:8081/config
Run Code Online (Sandbox Code Playgroud)
预期的:
{"compatibility":"BACKWARD"}
Run Code Online (Sandbox Code Playgroud)
结果:
curl: (7) Failed to connect to localhost port 8081: Connection refused
Run Code Online (Sandbox Code Playgroud)
我怎样才能找到要使用哪个端口?
我正在创建一个包含字符串和地图作为字段的 avro 类。我可以通过 maven 生成 avro 类,并且我能够在 localhost:8081 中创建一个注册表
.avsc 文件:
{
"type":"record",
"name":"AvroClass",
"namespace":"belliPack.avro",
"fields":[
{
"name":"title",
"type":"string"
},
{
"name":"map",
"type": {"type": "map", "values": "double"}
}
]
}
Run Code Online (Sandbox Code Playgroud)
模式注册表返回: $ curl -X GET http://localhost:8081/subjects/teste1-value/versions/1
{"subject":"teste1-value","version":1,"id":42,"schema":"{"type":"record","name":"AvroClass","namespace":"belliPack.avro","fields":[{"name":"title","type":"string"},{"name":"map","type":{"type":"map","values":"double"}}]}"}
Run Code Online (Sandbox Code Playgroud)
我的卡夫卡制作人课程是:
public KafkaProducer<String, AvroClass> createKafkaProducer() {
String bootstrapServer = "127.0.0.1:9092";
String schemaRegistryURL = "127.0.0.1:8081";
//create Producer properties
Properties properties = new Properties();
//kafka documentation>producer configs
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
properties.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,schemaRegistryURL);
//create producer
KafkaProducer<String, AvroClass> producer = new KafkaProducer<>(properties);
return producer;
} …Run Code Online (Sandbox Code Playgroud) 语境
我编写了几个小型的Kafka Connect连接器。一个每秒仅生成随机数据,另一个每秒将其记录在控制台中。它们与Schema Registry集成在一起,因此可以使用Avro序列化数据。
我使用Landoop提供的fast-data-dev Docker映像将它们部署到本地Kafka环境中
基本设置有效,并每秒产生一条记录的消息
但是,我想更改主题名称策略。默认一生成两个主题:
${topic}-key${topic}-value根据我的用例,我将需要生成具有不同模式的事件,这些事件最终会出现在同一主题上。因此,我需要的主题名称是:
${topic}-${keyRecordName}${topic}-${valueRecordName}根据文档,我的需求适合TopicRecordNameStrategy
我尝试了什么
我创建avroData用于发送值进行连接的对象:
class SampleSourceConnectorTask : SourceTask() {
private lateinit var avroData: AvroData
override fun start(props: Map<String, String>) {
[...]
avroData = AvroData(AvroDataConfig(props))
}
Run Code Online (Sandbox Code Playgroud)
然后用它来创建SourceRecord响应对象
该文档指出,为了在Kafka Connect中使用架构注册表,我必须在连接器配置中设置一些属性。因此,当我创建它时,将它们添加:
name=SampleSourceConnector
connector.class=[...]
tasks.max=1
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
Run Code Online (Sandbox Code Playgroud)
问题
连接器似乎忽略了这些属性,并继续使用旧的${topic}-key和${topic}-value主题。
题
Kafka Connect应该支持不同的主题策略。我设法通过编写自己的版本AvroConverter和硬编码来解决此问题,该主题策略是我所需要的。但是,这似乎不是一种好方法,并且在尝试使用Sink …
avro apache-kafka apache-kafka-connect confluent-schema-registry
我正在为带有架构注册表的 java kafka-stream 寻找适当的教程/指南。我有谷歌,但找不到合适的教程。如果有人能帮我找到至少合适的教程,我真的很感激
我可以通过谷歌搜索找到 kafka 流教程。但我正在寻找带有 schemaregistry 的 kafka 流
java avro apache-kafka apache-kafka-streams confluent-schema-registry