无法使用Confluent Elasticsearch接收器将Kafka主题数据转换为结构化JSON

Gee*_*ahm 8 python mongodb elasticsearch apache-kafka debezium

我正在使用Kafka构建数据管道.数据流如下:捕获mongodb中的数据更改并将其发送到elasticsearch.

在此输入图像描述

MongoDB的

  • 版本3.6
  • 分片群集

卡夫卡

  • Confuent Platform 4.1.0
  • mongoDB源连接器:debezium 0.7.5
  • elasticserach水槽连接器

Elasticsearch

  • 版本6.1.0

由于我还在测试,Kafka相关系统正在单个服务器上运行.

  • 启动zookeepr

    $ bin/zookeeper-server-start etc/kafka/zookeeper.properties
    
    Run Code Online (Sandbox Code Playgroud)
  • 启动引导服务器

    $ bin/kafka-server-start etc/kafka/server.properties
    
    Run Code Online (Sandbox Code Playgroud)
  • 启动注册表架构

    $ bin/schema-registry-start etc/schema-registry/schema-registry.properties
    
    Run Code Online (Sandbox Code Playgroud)
  • 启动mongodb源connetor

    $ bin/connect-standalone \ 
      etc/schema-registry/connect-avro-standalone.properties \ 
      etc/kafka/connect-mongo-source.properties
    
    $ cat etc/kafka/connect-mongo-source.properties
    >>> 
    name=mongodb-source-connector
    connector.class=io.debezium.connector.mongodb.MongoDbConnector
    mongodb.hosts=''
    initial.sync.max.threads=1
    tasks.max=1
    mongodb.name=higee
    
    $ cat etc/schema-registry/connect-avro-standalone.properties
    >>>
    bootstrap.servers=localhost:9092
    key.converter=io.confluent.connect.avro.AvroConverter
    key.converter.schema.registry.url=http://localhost:8081
    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url=http://localhost:8081
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    internal.value.converter=org.apache.kafka.connect.json.JsonConverter
    internal.key.converter.schemas.enable=false
    internal.value.converter.schemas.enable=false
    rest.port=8083
    
    Run Code Online (Sandbox Code Playgroud)
  • 启动elasticsearch sink连接器

    $ bin/connect-standalone \ 
      etc/schema-registry/connect-avro-standalone2.properties  \ 
      etc/kafka-connect-elasticsearch/elasticsearch.properties
    
    $ cat etc/kafka-connect-elasticsearch/elasticsearch.properties
    >>>
    name=elasticsearch-sink
    connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
    tasks.max=1
    topics=higee.higee.higee
    key.ignore=true
    connection.url=''
    type.name=kafka-connect
    
    $ cat etc/schema-registry/connect-avro-standalone2.properties
    >>>
    bootstrap.servers=localhost:9092
    key.converter=io.confluent.connect.avro.AvroConverter
    key.converter.schema.registry.url=http://localhost:8081
    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url=http://localhost:8081
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    internal.value.converter=org.apache.kafka.connect.json.\ 
                          JsonConverter
    internal.key.converter.schemas.enable=false
    internal.value.converter.schemas.enable=false
    rest.port=8084
    
    Run Code Online (Sandbox Code Playgroud)

以上系统一切都很好.Kafka连接器捕获数据更改(CDC)并通过接收器连接器成功将其发送到elasticsearch.问题是我无法将字符串类型消息数据转换为结构化数据类型.例如,在对mongodb进行一些更改后,让我们使用topic-data.

    $ bin/kafka-avro-console-consumer \
    --bootstrap-server localhost:9092 \
    --topic higee.higee.higee --from-beginning | jq
Run Code Online (Sandbox Code Playgroud)

然后,我得到以下结果.

    "after": null,
      "patch": {
        "string": "{\"_id\" : {\"$oid\" : \"5ad97f982a0f383bb638ecac\"},\"name\" : \"higee\",\"salary\" : 100,\"origin\" : \"South Korea\"}"
      },
      "source": {
        "version": {
          "string": "0.7.5"
        },
        "name": "higee",
        "rs": "172.31.50.13",
        "ns": "higee",
        "sec": 1524214412,
        "ord": 1,
        "h": {
          "long": -2379508538412995600
        },
        "initsync": {
          "boolean": false
        }
      },
      "op": {
        "string": "u"
      },
      "ts_ms": {
        "long": 1524214412159
      }
    }
Run Code Online (Sandbox Code Playgroud)

然后,如果我去elasticsearch,我会得到以下结果.

    {
        "_index": "higee.higee.higee",
        "_type": "kafka-connect",
        "_id": "higee.higee.higee+0+3",
        "_score": 1,
        "_source": {
          "after": null,
          "patch": """{"_id" : {"$oid" : "5ad97f982a0f383bb638ecac"}, 
                       "name" : "higee", 
                       "salary" : 100,
                       "origin" : "South Korea"}""",
          "source": {
            "version": "0.7.5",
            "name": "higee",
            "rs": "172.31.50.13",
            "ns": "higee",
            "sec": 1524214412,
            "ord": 1,
            "h": -2379508538412995600,
            "initsync": false
          },
          "op": "u",
          "ts_ms": 1524214412159
        }
      }
Run Code Online (Sandbox Code Playgroud)

我想要达到的目标如下

    {
        "_index": "higee.higee.higee",
        "_type": "kafka-connect",
        "_id": "higee.higee.higee+0+3",
        "_score": 1,
        "_source": {
          "oid" : "5ad97f982a0f383bb638ecac",
          "name" : "higee", 
          "salary" : 100,
          "origin" : "South Korea"
         }"
     }
Run Code Online (Sandbox Code Playgroud)

我一直在尝试并仍在考虑的一些选项如下.

  • logstash

    • 案例1:不知道如何解析这些字符(/ u0002,/ u0001)

      • logstash.conf

        input {
          kafka {
            bootstrap_servers => ["localhost:9092"]
            topics => ["higee.higee.higee"]
            auto_offset_reset => "earliest"
            codec => json {
              charset => "UTF-8"
            }
          }
        }
        
        filter {
          json {
            source => "message"
          }
         }
        
        output {
          stdout {
            codec => rubydebug
          }
        }
        
        Run Code Online (Sandbox Code Playgroud)
      • 结果

        {
        "message" => "H\u0002?\u0001{\"_id\" : \
            {\"$oid\" : \"5adafc0e2a0f383bb63910a6\"}, \
             \"name\" : \"higee\", \
             \"salary\" : 101, \
             \"origin\" : \"South Korea\"} \
             \u0002\n0.7.5\nhigee \ 
             \u0018172.31.50.13\u001Ahigee.higee2 \ 
             ???\v\u0002\u0002??????? \u0002\u0002u\u0002?????X",
        "tags" => [[0] "_jsonparsefailure"]
        }
        
        Run Code Online (Sandbox Code Playgroud)
    • 案例2

      • logstash.conf

        input {
          kafka {
            bootstrap_servers => ["localhost:9092"]
            topics => ["higee.higee.higee"]
            auto_offset_reset => "earliest"
            codec => avro {
              schema_uri => "./test.avsc"
            }
          }
        }
        
        filter {
          json {
            source => "message"
          }
        }
        
        output {
          stdout {
            codec => rubydebug
          }
        }
        
        Run Code Online (Sandbox Code Playgroud)
      • test.avsc

        {
            "namespace": "example",
            "type": "record",
            "name": "Higee",
            "fields": [
              {"name": "_id", "type": "string"},
              {"name": "name", "type": "string"},
              {"name": "salary",  "type": "int"},
              {"name": "origin", "type": "string"}
            ]
         }
        
        Run Code Online (Sandbox Code Playgroud)
      • 结果

        An unexpected error occurred! {:error=>#<NoMethodError: 
        undefined method `type_sym' for nil:NilClass>, :backtrace=> 
        ["/home/ec2-user/logstash- 
        6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
        1.8.2/lib/avro/io.rb:224:in `match_schemas'", "/home/ec2- 
        user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
        1.8.2/lib/avro/io.rb:280:in `read_data'", "/home/ec2- 
        user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
        1.8.2/lib/avro/io.rb:376:in `read_union'", "/home/ec2- 
        user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
        1.8.2/lib/avro/io.rb:309:in `read_data'", "/home/ec2- 
        user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
        1.8.2/lib/avro/io.rb:384:in `block in read_record'", 
        "org/jruby/RubyArray.java:1734:in `each'", "/home/ec2- 
        user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
        1.8.2/lib/avro/io.rb:382:in `read_record'", "/home/ec2- 
        user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
        1.8.2/lib/avro/io.rb:310:in `read_data'", "/home/ec2- 
        user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
        1.8.2/lib/avro/io.rb:275:in `read'", "/home/ec2- 
        user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/ 
        logstash-codec-avro-3.2.3-java/lib/logstash/codecs/ 
        avro.rb:77:in `decode'", "/home/ec2-user/logstash-6.1.0/ 
        vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka- 
        8.0.2/lib/ logstash/inputs/kafka.rb:254:in `block in 
        thread_runner'", "/home/ec2-user/logstash- 
        6.1.0/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka- 
        8.0.2/lib/logstash/inputs/kafka.rb:253:in `block in 
        thread_runner'"]}
        
        Run Code Online (Sandbox Code Playgroud)
  • python客户端

    • 在一些数据操作之后,使用不同的主题名称来使用主题并生成,以便elasticsearch sink连接器可以只使用来自python操作主题的格式良好的消息
    • kafka library:无法解码消息

      from kafka import KafkaConsumer
      
      consumer = KafkaConsumer(
                   topics='higee.higee.higee',
                   auto_offset_reset='earliest'
                 )
      
      for message in consumer:
          message.value.decode('utf-8')
      
      >>> 'utf-8' codec can't decode byte 0xe4 in position 6: 
          invalid continuation byte
      
      Run Code Online (Sandbox Code Playgroud)
    • confluent_kafka 与python 3不兼容


知道如何在弹性搜索中对数据进行jsonify吗?以下是我搜索的来源.

提前致谢.


一些尝试

1)我已经按如下方式更改了我的connect-mongo-source.properties文件以测试转换.

    $ cat etc/kafka/connect-mongo-source.properties
    >>> 
    name=mongodb-source-connector
    connector.class=io.debezium.connector.mongodb.MongoDbConnector
    mongodb.hosts=''
    initial.sync.max.threads=1
    tasks.max=1
    mongodb.name=higee
    transforms=unwrap     
    transforms.unwrap.type = io.debezium.connector.mongodbtransforms.UnwrapFromMongoDbEnvelope
Run Code Online (Sandbox Code Playgroud)

以下是我得到的错误日志.还不熟悉Kafka,更重要的是debezium平台,我无法调试此错误.

ERROR WorkerSourceTask{id=mongodb-source-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.bson.json.JsonParseException: JSON reader expected a string but found '0'.
    at org.bson.json.JsonReader.visitBinDataExtendedJson(JsonReader.java:904)
    at org.bson.json.JsonReader.visitExtendedJSON(JsonReader.java:570)
    at org.bson.json.JsonReader.readBsonType(JsonReader.java:145)
    at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:82)
    at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:41)
    at org.bson.codecs.BsonDocumentCodec.readValue(BsonDocumentCodec.java:101)
    at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:84)
    at org.bson.BsonDocument.parse(BsonDocument.java:62)
    at io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope.apply(UnwrapFromMongoDbEnvelope.java:45)
    at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:218)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:194)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Run Code Online (Sandbox Code Playgroud)

2)在这个时候,我改变了elasticsearch.properties并且没有对connect-mongo-source.properties做出改变.

$ cat connect-mongo-source.properties

    name=mongodb-source-connector
    connector.class=io.debezium.connector.mongodb.MongoDbConnector
    mongodb.hosts=''
    initial.sync.max.threads=1
    tasks.max=1
    mongodb.name=higee

$ cat elasticsearch.properties

    name=elasticsearch-sink
    connector.class = io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
    tasks.max=1
    topics=higee.higee.higee
    key.ignore=true
    connection.url=''
    type.name=kafka-connect
    transforms=unwrap
    transforms.unwrap.type = io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope
Run Code Online (Sandbox Code Playgroud)

我得到了以下错误.

ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.bson.BsonInvalidOperationException: Document does not contain key $set
    at org.bson.BsonDocument.throwIfKeyAbsent(BsonDocument.java:844)
    at org.bson.BsonDocument.getDocument(BsonDocument.java:135)
    at io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope.apply(UnwrapFromMongoDbEnvelope.java:53)
    at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:480)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Run Code Online (Sandbox Code Playgroud)

3)更改了test.avsc并运行了logstash.我没有得到任何错误消息但结果不是我期待的结果origin,即使它们被赋予非空值salary,name字段也都是空的.我甚至能够通过控制台消费者正确地读取数据.

$ cat test.avsc
>>>
    {
      "type" : "record",
      "name" : "MongoEvent",
      "namespace" : "higee.higee",
      "fields" : [ {
        "name" : "_id",
        "type" : {
          "type" : "record",
          "name" : "HigeeEvent",
          "fields" : [ {
            "name" : "$oid",
            "type" : "string"
          }, {
            "name" : "salary",
            "type" : "long"
          }, {
            "name" : "origin",
            "type" : "string"
          }, {
            "name" : "name",
            "type" : "string"
          } ]
        }
      } ]
    }

$ cat logstash3.conf
>>>
    input {
      kafka {
        bootstrap_servers => ["localhost:9092"]
        topics => ["higee.higee.higee"]
        auto_offset_reset => "earliest"
        codec => avro {
          schema_uri => "./test.avsc"
        }
      }
    }

    output {
      stdout {
       codec => rubydebug
      }
    }

$ bin/logstash -f logstash3.conf
>>>
    {
    "@version" => "1",
    "_id" => {
      "salary" => 0,
      "origin" => "",
      "$oid" => "",
      "name" => ""
    },
    "@timestamp" => 2018-04-25T09:39:07.962Z
    }
Run Code Online (Sandbox Code Playgroud)

cri*_*007 2

Python客户端

必须使用 Avro Consumer,否则您将得到'utf-8' codec can't decode byte

即使这个示例也不起作用因为您仍然需要架构注册表来查找架构。

Confluence 的 Python 客户端的先决条件表明它可以与 Python 3.x 配合使用

没有什么可以阻止你使用不同的客户端,所以不确定为什么你只尝试 Python。

Logstash Avro 编解码器

  1. JSON 编解码器无法解码 Avro 数据。我认为 avro 输入编解码器后面的 json 过滤器也不起作用
  2. 您的 Avro 架构错误 - 您缺少$oid替换_id
  3. “原始 Avro”(包括消息本身中的模式)和 Confluence 的编码版本(仅包含注册表中的模式 ID)之间存在差异。这意味着,Logstash 不与架构注册表集成……至少在没有插件的情况下是这样

你的 AVSC 实际上应该是这样的

{
  "type" : "record",
  "name" : "MongoEvent",
  "namespace" : "higee.higee",
  "fields" : [ {
    "name" : "_id",
    "type" : {
      "type" : "record",
      "name" : "HigeeEvent",
      "fields" : [ {
        "name" : "$oid",
        "type" : "string"
      }, {
        "name" : "salary",
        "type" : "long"
      }, {
        "name" : "origin",
        "type" : "string"
      }, {
        "name" : "name",
        "type" : "string"
      } ]
    }
  } ]
}
Run Code Online (Sandbox Code Playgroud)

但是,Avro 不允许名称以除 正则表达式之外的任何内容开头[A-Za-z_],因此这$oid将是一个问题。

虽然我不推荐它(也没有实际尝试过),但将 JSON 编码的 Avro 数据从 Avro 控制台使用者获取到 Logstash 的一种可能方法是使用 Pipe 输入插件

input {
  pipe {
    codec => json
    command => "/path/to/confluent/bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic higee.higee.higee --from-beginning" 
  }
}
Run Code Online (Sandbox Code Playgroud)

德贝齐姆

请注意,该after值始终是一个字符串,并且按照惯例,它将包含文档的 JSON 表示形式

http://debezium.io/docs/connectors/mongodb/

我认为这也适用于patch价值观,但我真的不了解 Debezium。

如果不使用简单消息转换 (SMT),Kafka 不会动态解析 JSON。阅读您链接到的文档,您可能应该将它们添加到您的 Connect Source 属性中

transforms=unwrap
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope
Run Code Online (Sandbox Code Playgroud)

另外值得指出的是,场平坦化已在路线图上 - DBZ-561

卡夫卡连接Elasticsearch

如果不使用 Logstash 或其JSON Processor之类的东西,Elasticsearch 不会解析和处理编码的 JSON 字符串对象。相反,它仅将它们作为整个字符串体进行索引。

如果我没记错的话,Connect 只会将 Elasticsearch 映射应用于顶级 Avro 字段,而不是嵌套字段。

换句话说,生成的映射遵循以下模式:

"patch": {
    "string": "...some JSON object string here..."
  },
Run Code Online (Sandbox Code Playgroud)

你实际上需要像这样的地方 - 也许手动定义你的 ES 索引

"patch": {
   "properties": {
      "_id": {
        "properties" {
          "$oid" :  { "type": "text" }, 
          "name" :  { "type": "text" },
          "salary":  { "type": "int"  }, 
          "origin": { "type": "text" }
      },
Run Code Online (Sandbox Code Playgroud)

不过,再次不确定是否允许使用美元符号。

Kafka Connect MongoDB 源

如果以上方法均不起作用,您可以尝试使用其他连接器