无法将主题映射到 kafka elasticsearch 连接器中的指定索引

1 elasticsearch apache-kafka apache-kafka-connect

尝试将主题“name:localtopic”映射到索引“name:indexoftopic”,它在弹性搜索“localtopic和indexoftopic”中创建两个新索引,并且主题数据仅在主题名称索引“localtopic”中可见,连接器中没有显示错误(分布式模式)

我的配置是

 "config" : {
  "connector.class" : "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
  "tasks.max" : "1",
  "topics" : "localtopic", 
  "topic.index.map" : "localtopic:indexoftopic",
  "connection.url" : "aws elasticsearch url",
  "type.name" : "event",
  "key.ignore" : "false",
  "schema.ignore" : "true",
  "schemas.enable" : "false",
  "transforms" : "InsertKey,extractKey",
  "transforms.InsertKey.type" : "org.apache.kafka.connect.transforms.ValueToKey",
  "transforms.InsertKey.fields" : "event-id",
  "transforms.extractKey.type" : "org.apache.kafka.connect.transforms.ExtractField$Key",
  "transforms.extractKey.field" : "event-id"
 }
Run Code Online (Sandbox Code Playgroud)

索引名称:indexoftopic是在elasticsearch中创建的,但数据是通过index_name:localtopic kafkaversion:2.3连接器版本:5 elasticsearch版本:3.2.0看到的

即使在日志 INFO --topics.regex = "" 中,我也不知道 ihis 选项,任何人都可以建议。怎么用这个???

小智 6

下面对我有用,但是,我只将 1 个主题映射到不同的索引名称

"transforms": "addSuffix",  
"transforms.addSuffix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.addSuffix.regex": "topic1.*",
"transforms.addSuffix.replacement": "index1",
Run Code Online (Sandbox Code Playgroud)

因此,通过上述转换任何主题,例如topic1, topic1-test, topic1<anystring> 将映射到index1

或者,您也可以使用主题名称进行替换,通过更改最后两行如下,这将选择

    "transforms.addSuffix.regex": "topic.*",
    "transforms.addSuffix.replacement": "index$1",
Run Code Online (Sandbox Code Playgroud)

基本上,您可以使用正则表达式替换部分或完整主题名称。