具有逻辑类型的 Avro 模式不能与最新的 confluence-kafka 一起使用

Fed*_*ore 5 python avro apache-kafka confluent-schema-registry

我们使用 kafka 与 avro 架构,并且架构注册表设置为FULL兼容性。我们的模式使用logicalType字段,例如:

{
  "name": "MyRecord",
  "type": "record",
  "fields": [
    {
      "name": "created_at",
      "type": [
        "null",
        {
          "type": "long",
          "logicalType": "timestamp-millis"
        }
      ],
      "default": null
    }
  ]
}
Run Code Online (Sandbox Code Playgroud)

这对于我们正在使用的相当旧的版本来说工作得很好confluent-kafka,因为它依赖于avro-python31.8。但是,最近confluent-kafka依赖于avro-python31.10,消息序列化失败,并显示TypeError: unhashable type: 'mappingproxy'

我已经打开了一个 PR 来解决这个问题,但并没有引起太多关注。

假设它不会被合并,我还有哪些其他选项可以升级到最新版本confluent-kafka

我看到的唯一解决方案是摆脱logicalType,但这将是不兼容的架构更改,因此我要么放弃兼容性FULL,要么使用绑定到不同架构的不同主题。

即使上述方法有效,我也必须手动将毫秒转换为时间戳,这对我们的代码库来说是一个很大的变化。

eik*_*eik 0

您的架构不是有效的 Avro。文档指出,\xe2\x80\x9c 当为类型为 union 的记录字段指定默认值时,默认值的类型必须与union \xe2\x80\x9d 的第一个元素匹配。对于您的字段\xe2\x80\x9c \xe2\x80\x9d 是唯一有效的默认值,否则您必须将架构定义为created_atnull

\n
...\n    {\n      "name": "created_at",\n      "type": [\n        {\n          "type": "long",\n          "logicalType": "timestamp-millis"\n        },\n        "null"\n      ],\n      "default": 1\n    }\n...\n\n
Run Code Online (Sandbox Code Playgroud)\n

默认值为 1970 年 1 月 1 日(实际上是午夜后 1 毫秒),这可能不是您想要的。

\n