我正在尝试将自定义转换器与Kafka Connect一起使用,但似乎无法正确使用。我希望有人对此有经验,可以帮助我解决!
我的自定义转换器的类路径为custom.CustomStringConverter
。
为避免出现任何错误,我的自定义转换器当前只是先前存在的StringConverter的副本/粘贴(当然,在我开始使用它时,它将改变)。 https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java
我有一个由3个节点组成的kafka connect集群,这些节点正在运行confluent的官方docker映像(confluentinc/cp-kafka-connect:3.3.0
)。
每个节点都配置为使用我的转换器加载一个jar(使用docker卷)。
当连接器启动时,它们会正确加载罐子并找到自定义转换器。确实,这是我在日志中看到的内容:
[2017-10-10 13:06:46,274] INFO Registered loader: PluginClassLoader{pluginLocation=file:/opt/custom-connectors/custom-converter-1.0-SNAPSHOT.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:199)
[2017-10-10 13:06:46,274] INFO Added plugin 'custom.CustomStringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[...]
[2017-10-10 13:07:43,454] INFO Added aliases 'CustomStringConverter' and 'CustomString' to plugin 'custom.CustomStringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
Run Code Online (Sandbox Code Playgroud)
然后,我将JSON配置发布到连接器节点之一以创建我的连接器:
{
"name": "hdfsSinkCustom",
"config": {
"topics": "yellow",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "custom.CustomStringConverter",
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"hdfs.url": "hdfs://hdfs-namenode:8020/hdfs-sink",
"topics.dir": "yellow_storage",
"flush.size": "1",
"rotate.interval.ms": "1000"
}
}
Run Code Online (Sandbox Code Playgroud)
并收到以下回复:
{
"error_code": 400,
"message": "Connector configuration is invalid and contains …
Run Code Online (Sandbox Code Playgroud)