小编Jeh*_*man的帖子

在Kafka Connect中使用自定义转换器吗?

我正在尝试将自定义转换器与Kafka Connect一起使用,但似乎无法正确使用。我希望有人对此有经验,可以帮助我解决!

初始情况

怎么了 ?

当连接器启动时,它们会正确加载罐子并找到自定义转换器。确实,这是我在日志中看到的内容:

[2017-10-10 13:06:46,274] INFO Registered loader: PluginClassLoader{pluginLocation=file:/opt/custom-connectors/custom-converter-1.0-SNAPSHOT.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:199)
[2017-10-10 13:06:46,274] INFO Added plugin 'custom.CustomStringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[...]
[2017-10-10 13:07:43,454] INFO Added aliases 'CustomStringConverter' and 'CustomString' to plugin 'custom.CustomStringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
Run Code Online (Sandbox Code Playgroud)

然后,我将JSON配置发布到连接器节点之一以创建我的连接器:

{
  "name": "hdfsSinkCustom",
  "config": {
    "topics": "yellow",
    "tasks.max": "1",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "custom.CustomStringConverter",
    "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
    "hdfs.url": "hdfs://hdfs-namenode:8020/hdfs-sink",
    "topics.dir": "yellow_storage",
    "flush.size": "1",
    "rotate.interval.ms": "1000"
  }
}
Run Code Online (Sandbox Code Playgroud)

并收到以下回复:

{
   "error_code": 400,
   "message": "Connector configuration is invalid and contains …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-connect

0
推荐指数
1
解决办法
2666
查看次数

标签 统计

apache-kafka ×1

apache-kafka-connect ×1