Jav*_*cal 5 java apache-kafka apache-kafka-connect
我想将一组 SMT(单消息转换)应用于一个主题。
我有以下配置..
connector.class: io.debezium.connector.postgresql.PostgresConnector
name: test
database.server.name: testserver
database.hostname: 127.0.0.1
database.port: 5432
database.dbname: test
database.user: testuser
database.password: "testpass"
schema.whitelist: test
table.whitelist: test.topic1, test.topic2, test.topic3
transforms:unwrap,extractTopic1Key,extractTopic1Value,extractTopic2Key
transforms.unwrap.type: io.debezium.transforms.UnwrapFromEnvelope
transforms.unwrap.delete.handling.mode: rewrite
transforms.unwrap.drop.tombstones: false
transforms.extractTopic1Key.type: org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractTopic1Value.type: org.apache.kafka.connect.transforms.ExtractField$Value
transforms.extractTopic1Key.field: propname
transforms.extractTopic1Value.field: propvalue
transforms.extractTopic1Key.predicate: test_topic1
transforms.extractTopic1Value.predicate: test_topic1
transforms.extractTopic2Key.type:org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractTopic2Key.field: id
transforms.extractTopic2Key.predicate: test_topic2
predicates=test_topic1,test_topic2
predicates.test_topic1.type=org.apache.kafka.connect.predicates.TopicNameMatches
predicates.test_topic1.pattern=.*topic1
predicates.test_topic2.type=org.apache.kafka.connect.predicates.TopicNameMatches
predicates.test_topic2.pattern=.*topic2
key.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: false
value.converter: org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable: false
decimal.handling.mode: string
include.schema.changes: false
#Adds header which says insert,update,delete
transforms.unwrap.operation.header: true
snapshot.mode: initial
heartbeat.interval.ms: 10000
slot.name: test
Run Code Online (Sandbox Code Playgroud)
我收到以下错误..
connector.class: io.debezium.connector.postgresql.PostgresConnector
name: test
database.server.name: testserver
database.hostname: 127.0.0.1
database.port: 5432
database.dbname: test
database.user: testuser
database.password: "testpass"
schema.whitelist: test
table.whitelist: test.topic1, test.topic2, test.topic3
transforms:unwrap,extractTopic1Key,extractTopic1Value,extractTopic2Key
transforms.unwrap.type: io.debezium.transforms.UnwrapFromEnvelope
transforms.unwrap.delete.handling.mode: rewrite
transforms.unwrap.drop.tombstones: false
transforms.extractTopic1Key.type: org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractTopic1Value.type: org.apache.kafka.connect.transforms.ExtractField$Value
transforms.extractTopic1Key.field: propname
transforms.extractTopic1Value.field: propvalue
transforms.extractTopic1Key.predicate: test_topic1
transforms.extractTopic1Value.predicate: test_topic1
transforms.extractTopic2Key.type:org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractTopic2Key.field: id
transforms.extractTopic2Key.predicate: test_topic2
predicates=test_topic1,test_topic2
predicates.test_topic1.type=org.apache.kafka.connect.predicates.TopicNameMatches
predicates.test_topic1.pattern=.*topic1
predicates.test_topic2.type=org.apache.kafka.connect.predicates.TopicNameMatches
predicates.test_topic2.pattern=.*topic2
key.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: false
value.converter: org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable: false
decimal.handling.mode: string
include.schema.changes: false
#Adds header which says insert,update,delete
transforms.unwrap.operation.header: true
snapshot.mode: initial
heartbeat.interval.ms: 10000
slot.name: test
Run Code Online (Sandbox Code Playgroud)
当我删除除 之外的所有转换时unwrap,它工作正常。我的示例主题数据如下所示。
钥匙
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:308)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:234)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:61)
at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 11 more
Run Code Online (Sandbox Code Playgroud)
价值
{"propname":"my.test.prop"}
Run Code Online (Sandbox Code Playgroud)
propname是表中的主键。
上述配置有什么问题?我们如何将特定的 SMT 仅应用于指定的主题?
PS:我使用的是Confluence平台5.3.1
自 2.6.0 版本起, apache-kafka已通过KIP-585 - 过滤器和条件 SMT支持有条件应用 SMT 。
您正在使用 Confluence Platform 5.3.1。5.3.1是 Confluence Platform 的错误修复版本,为您提供 Apache Kafka 2.3.0,这是 Kafka 的最新稳定版本。
所以需要将Kafka Connect升级到2.6.0版本。
| 归档时间: |
|
| 查看次数: |
1123 次 |
| 最近记录: |