Kafka Connect 没有模式,只有 JSON

Аль*_*ров 3 apache-kafka-connect

我想使用带有 JSON 且不带架构的 JDBC 接收器连接器。他们写道(来源):

如果您需要在没有 Schema Registry 的情况下使用 JSON 来连接数据,则可以使用 Kafka 支持的 JsonConverter。下面的示例显示了添加到配置中的 JsonConverter 键和值属性:

key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false

当属性 key.converter.schemas.enable 和 value.converter.schemas.enable 设置为 true 时,键或值不会被视为纯 JSON,而是被视为包含内部架构和数据的复合 JSON 对象。当为源连接器启用这些功能时,架构和数据都位于复合 JSON 对象中。当为接收器连接器启用这些功能时,将从复合 JSON 对象中提取架构和数据。请注意,此实现从不使用架构注册表。

当属性 key.converter.schemas.enable 和 value.converter.schemas.enable 设置为 false(默认值)时,仅传递数据,而不传递架构。这减少了不需要模式的应用程序的负载开销。

我配置了连接器:

{
  "name": "noschemajustjson",
  "config": {
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false",
    "schemas.enable": "false",
    "name": "noschemajustjson",
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "config.action.reload": "restart",
    "errors.log.enable": "true",
    "errors.log.include.messages": "true",
    "topics": "testconnect2",
    "connection.url": "jdbc:postgresql://postgres:5432/postgres",
    "connection.user": "postgres",
    "connection.password": "********",
    "dialect.name": "PostgreSqlDatabaseDialect",
    "table.name.format": "utp",
    "auto.create": "false",
    "auto.evolve": "false"
  }
}
Run Code Online (Sandbox Code Playgroud)

但我仍然收到错误:

引起原因:org.apache.kafka.connect.errors.ConnectException:接收器连接器“noschemajustjson2”配置为“delete.enabled=false”和“pk.mode=none”,因此需要具有非空 Struct 值和的记录非空结构模式,但在 (topic='testconnect2',partition=0,offset=0,timestamp=1626416739697) 处找到具有 HashMap 值和空值模式的记录。

那么我应该怎么做才能强制 Connect 在没有模式的情况下工作(只有纯 JSON)?

Rob*_*att 7

我想使用带有 JSON 且不带模式的JDBC 接收器连接器

您不能执行此操作 - JDBC Sink 连接器流式传输到关系数据库,而关系数据库具有模式 :-D 因此 JDBC Sink 连接器需要为数据提供模式。

根据您的数据来源,您有不同的选择。

  • 如果它是从 Kafka Connect 摄取的,请使用支持架构(Avro、Protobuf、JSON Schema)的转换器
  • 如果它是由您可以控制的应用程序生成的,请让该应用程序使用架构(Avro、Protobuf、JSON 架构)序列化该数据
  • 如果它来自您无法控制的某个地方,那么您需要预处理该主题以添加显式架构并将其写入一个新主题,然后由 JDBC Sink 连接器使用。

参考资料和资源: