kafka-connect-elasticsearch:当使用“write.method”作为 upsert 时,是否可以在 kafka 主题上使用相同的 AVRO 对象来发送部分文档?

pri*_*ixa 2 elasticsearch apache-kafka apache-kafka-connect

我正在尝试对 Elasticsearch (ES) kafka 连接器使用“write.method”upsert。从我的 kafka 流应用程序中,我正在编写我想要更新插入的文档,该文档位于 ES 连接器配置为读取的 kafka 主题上。我在这个主题上使用 avro 对象作为 kafka 值。我的文档的 AVRO 定义如下所示:

{
  "type": "record",
  "name": "Document",
  "fields": [
    {
      "name": "id",
      "type": ["null", "string"],
    },
    {
      "name": "name",
      "type": ["null", "string"]
    },
    {
      "name": "address",
      "type": ["null", "string"]
    }
  ]
}
Run Code Online (Sandbox Code Playgroud)

该文档有时仅包含 ID 和名称,有时仅包含地址。当我只发送地址时,id 和 name 会被覆盖,反之亦然。我已设置behavior.on.null.valuesignore希望 ES 连接器忽略 null id 和 name 值,但这并不能按预期工作。

尽管当我在 kafka 主题上使用两个不同的 AVRO 对象时,第一个仅包含 id 和名称,另一个仅包含地址,upsert 模式行为符合预期。但是对于同一个kafka主题允许多个AVRO对象定义,我需要将主题的兼容模式设置为NONE,这并不理想。

解决当前问题的正确方法是什么?

Rob*_*att 5

该设置behavior.on.null.values = ignore只是告诉连接器,如果它收到一条消息,其中整个消息为空,则忽略该消息(其他选项是失败,或者删除 Elasticsearch 中与该消息的键与空值匹配的目标文档,即墓碑消息)。

连接器不支持您描述的部分更新行为。它可以插入/更新/删除,但只能插入整个文档

如果您想要部分更新插入行为,那么您需要自己实现这一点,无论是在自定义连接器中还是通过在 Kafka Streams 应用程序中存储状态,以便能够在每次增量出现时发出完整的记录。

部分更新可以通过write.method=upsert