Pel*_*cho 0 avro apache-kafka apache-kafka-connect confluent-schema-registry
语境
我编写了几个小型的Kafka Connect连接器。一个每秒仅生成随机数据,另一个每秒将其记录在控制台中。它们与Schema Registry集成在一起,因此可以使用Avro序列化数据。
我使用Landoop提供的fast-data-dev Docker映像将它们部署到本地Kafka环境中
基本设置有效,并每秒产生一条记录的消息
但是,我想更改主题名称策略。默认一生成两个主题:
${topic}-key${topic}-value根据我的用例,我将需要生成具有不同模式的事件,这些事件最终会出现在同一主题上。因此,我需要的主题名称是:
${topic}-${keyRecordName}${topic}-${valueRecordName}根据文档,我的需求适合TopicRecordNameStrategy
我尝试了什么
我创建avroData用于发送值进行连接的对象:
class SampleSourceConnectorTask : SourceTask() {
private lateinit var avroData: AvroData
override fun start(props: Map<String, String>) {
[...]
avroData = AvroData(AvroDataConfig(props))
}
Run Code Online (Sandbox Code Playgroud)
然后用它来创建SourceRecord响应对象
该文档指出,为了在Kafka Connect中使用架构注册表,我必须在连接器配置中设置一些属性。因此,当我创建它时,将它们添加:
name=SampleSourceConnector
connector.class=[...]
tasks.max=1
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
Run Code Online (Sandbox Code Playgroud)
问题
连接器似乎忽略了这些属性,并继续使用旧的${topic}-key和${topic}-value主题。
题
Kafka Connect应该支持不同的主题策略。我设法通过编写自己的版本AvroConverter和硬编码来解决此问题,该主题策略是我所需要的。但是,这似乎不是一种好方法,并且在尝试使用Sink Kafka Connector占用数据时也带来了问题。我复制了这个主题,所以有一个旧名称(${topic}-key)的版本,它可以正常工作
将主题策略指定给Kafka Connect的正确设置是什么?
您缺少key.converterand value.converter前缀,因此配置将传递给对流器。所以代替:
key.subject.name.strategy
value.subject.name.strategy
Run Code Online (Sandbox Code Playgroud)
你要:
key.converter.key.subject.name.strategy
value.converter.value.subject.name.strategy
Run Code Online (Sandbox Code Playgroud)
来源https://docs.confluent.io/current/connect/managing/configuring.html:
要将配置参数传递给键和值转换器,请在定义默认转换器时给它们加上前缀
key.converter.或value.converter.在工作程序配置中添加前缀。请注意,仅当在key.converter或value.converter属性中指定了相应的转换器配置时才使用它们。