Kafka Connect JDBC接收器连接器无法正常工作

Dig*_*ela 4 apache-kafka kafka-consumer-api apache-kafka-connect

我正在尝试使用Kafka Connect JDBC sink连接器将数据插入Oracle,但它会引发错误.我已经尝试了架构的所有可能配置.以下是示例.

请提示我是否遗漏了以下任何内容是我的配置文件和错误.

案例1-第一配置

internal.value.converter.schemas.enable=false .
Run Code Online (Sandbox Code Playgroud)

所以我得到了

[2017-08-28 16:16:26,119] INFO Sink task WorkerSinkTask{id=oracle_sink-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:233)

[2017-08-28 16:16:26,606] INFO Discovered coordinator dfw-appblx097-01.prod.walmart.com:9092 (id: 2147483647 rack: null) for group connect-oracle_sink. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:597)

[2017-08-28 16:16:26,608] INFO Revoking previously assigned partitions [] for group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:419)

[2017-08-28 16:16:26,609] INFO (Re-)joining group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:432)

[2017-08-28 16:16:27,174] INFO Successfully joined group connect-oracle_sink with generation 26 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:399)

[2017-08-28 16:16:27,176] INFO Setting newly assigned partitions [DJ-7, DJ-6, DJ-5, DJ-4, DJ-3, DJ-2, DJ-1, DJ-0, DJ-9, DJ-8] for group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:262)

[2017-08-28 16:16:28,580] ERROR Task oracle_sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:455)

org.apache.kafka.connect.errors.ConnectException: No fields found using key and value schemas for table: DJ

   at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:190)

   at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:58)

   at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:65)

   at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:62)

   at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:66)

   at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:435)

   at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:251)

   at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)

   at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)

   at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)

   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)

   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

   at java.util.concurrent.FutureTask.run(FutureTask.java:266)

   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

   at java.lang.Thread.run(Thread.java:748)
Run Code Online (Sandbox Code Playgroud)

第二配置 -

internal.key.converter.schemas.enable=true

internal.value.converter.schemas.enable=true
Run Code Online (Sandbox Code Playgroud)

日志:

[2017-08-28 16:23:50,993] INFO Revoking previously assigned partitions [] for group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:419)

[2017-08-28 16:23:50,993] INFO (Re-)joining group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:432)

[2017-08-28 16:23:51,260] INFO (Re-)joining group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:432)

[2017-08-28 16:23:51,381] INFO Successfully joined group connect-oracle_sink with generation 29 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:399)

[2017-08-28 16:23:51,384] INFO Setting newly assigned partitions [DJ-7, DJ-6, DJ-5, DJ-4, DJ-3, DJ-2, DJ-1, DJ-0, DJ-9, DJ-8] for group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:262)

[2017-08-28 16:23:51,727] ERROR Task oracle_sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)

org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.

   at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:308)
Run Code Online (Sandbox Code Playgroud)

Oracle connector.properties看起来像

name=oracle_sink

connector.class=io.confluent.connect.jdbc.JdbcSinkConnector

tasks.max=1

topics=DJ

connection.url=jdbc:oracle:thin:@hostname:port:sid

connection.user=username

connection.password=password

#key.converter=org.apache.kafka.connect.json.JsonConverter

#value.converter=org.apache.kafka.connect.json.JsonConverter

auto.create=true

auto.evolve=true
Run Code Online (Sandbox Code Playgroud)

Connect-Standalone.properties

我的JSON看起来像 -

{"Item":"12","Sourcing Reason":"corr","Postal Code":"l45","OrderNum":"10023","Intended Node Distance":1125.8,"Chosen Node":"34556","Quantity":1,"Order Date":1503808765201,"Intended Node":"001","Chosen Node Distance":315.8,"Sourcing Logic":"reducesplits"}
Run Code Online (Sandbox Code Playgroud)

Rob*_*att 9

根据文档

接收器连接器需要了解模式,因此您应该使用合适的转换器,例如模式注册表附带的Avro转换器,或启用了模式的JSON转换器.

因此,如果您的数据是JSON,您将具有以下配置:

[...]
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
[...]
Run Code Online (Sandbox Code Playgroud)

您在第二个实例中看到的错误是相关的 - JsonConverter with schemas.enable requires "schema" and "payload" fields您共享的JSON不符合此要求的格式.

以下是带有schema和的有效JSON消息的简单示例payload:

{
    "schema": {
        "type": "struct",
        "fields": [{
            "type": "int32",
            "optional": true,
            "field": "c1"
        }, {
            "type": "string",
            "optional": true,
            "field": "c2"
        }, {
            "type": "int64",
            "optional": false,
            "name": "org.apache.kafka.connect.data.Timestamp",
            "version": 1,
            "field": "create_ts"
        }, {
            "type": "int64",
            "optional": false,
            "name": "org.apache.kafka.connect.data.Timestamp",
            "version": 1,
            "field": "update_ts"
        }],
        "optional": false,
        "name": "foobar"
    },
    "payload": {
        "c1": 10000,
        "c2": "bar",
        "create_ts": 1501834166000,
        "update_ts": 1501834166000
    }
}
Run Code Online (Sandbox Code Playgroud)

您尝试登陆Oracle的数据的来源是什么?如果它是Kafka Connect inbound,那么您只需使用相同的converter配置(Avro + Confluent Schema Registry)将更容易,更高效.如果它是自定义应用程序,您需要将它(a)使用Confluent Avro序列化程序或(b)以上述所需格式编写JSON,并提供与消息内联的有效负载架构.

  • 那不能回答我的问题。您可以使用 Kafka Connect 从多个表中摄取,无论是使用单个连接器还是多个连接器。 (2认同)