Аль*_*ров 3 apache-kafka-connect
我想使用带有 JSON 且不带架构的 JDBC 接收器连接器。他们写道(来源):
如果您需要在没有 Schema Registry 的情况下使用 JSON 来连接数据,则可以使用 Kafka 支持的 JsonConverter。下面的示例显示了添加到配置中的 JsonConverter 键和值属性:
key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false
当属性 key.converter.schemas.enable 和 value.converter.schemas.enable 设置为 true 时,键或值不会被视为纯 JSON,而是被视为包含内部架构和数据的复合 JSON 对象。当为源连接器启用这些功能时,架构和数据都位于复合 JSON 对象中。当为接收器连接器启用这些功能时,将从复合 JSON 对象中提取架构和数据。请注意,此实现从不使用架构注册表。
当属性 key.converter.schemas.enable 和 value.converter.schemas.enable 设置为 false(默认值)时,仅传递数据,而不传递架构。这减少了不需要模式的应用程序的负载开销。
我配置了连接器:
{
"name": "noschemajustjson",
"config": {
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"schemas.enable": "false",
"name": "noschemajustjson",
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"config.action.reload": "restart",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"topics": "testconnect2",
"connection.url": "jdbc:postgresql://postgres:5432/postgres",
"connection.user": "postgres",
"connection.password": "********",
"dialect.name": "PostgreSqlDatabaseDialect",
"table.name.format": "utp",
"auto.create": "false",
"auto.evolve": "false"
}
}
Run Code Online (Sandbox Code Playgroud)
但我仍然收到错误:
引起原因:org.apache.kafka.connect.errors.ConnectException:接收器连接器“noschemajustjson2”配置为“delete.enabled=false”和“pk.mode=none”,因此需要具有非空 Struct 值和的记录非空结构模式,但在 (topic='testconnect2',partition=0,offset=0,timestamp=1626416739697) 处找到具有 HashMap 值和空值模式的记录。
那么我应该怎么做才能强制 Connect 在没有模式的情况下工作(只有纯 JSON)?
我想使用带有 JSON 且不带模式的JDBC 接收器连接器
您不能执行此操作 - JDBC Sink 连接器流式传输到关系数据库,而关系数据库具有模式 :-D 因此 JDBC Sink 连接器需要为数据提供模式。
根据您的数据来源,您有不同的选择。
参考资料和资源:
| 归档时间: |
|
| 查看次数: |
5068 次 |
| 最近记录: |