我正在使用 JDBC 源连接器从 Teradata 表读取数据并推送到 Kafka 主题。但是当我尝试使用 JDBC 接收器连接器读取 Kafka 主题并推送到 Oracle 表时,它会抛出以下错误。我确信错误是由于参数造成的pk.mode,并且pk.fields我不确定要使用什么。
我的 terradata 有一个主键 UserID+ DatabaseID 。我在 Oracle 中创建了表,主键为 Userid+databaseID。
** ERROR Cannot ALTER to add missing field SinkRecordField{schema=Schema{BYTES},
name='CreateUID', isPrimaryKey=true}, as it is not
optional and does not have a default value**
Run Code Online (Sandbox Code Playgroud)
下面是我的水槽连接器-
{name=teradata_sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=TERADATA_ACCESSRIGHTS
connection.url=
connection.user=
connection.password=
auto.create=false
table.name.format=TERADATA_ACCESSRIGHTS
pk.mode=record_value
pk.fields=USERID+DATABASEID
auto.evolve=true
insert.mode=upsert
}
Run Code Online (Sandbox Code Playgroud)
请建议我如何使用具有给定主键的 JDBC 接收器连接器。
我正在尝试使用Kafka Connect JDBC sink连接器将数据插入Oracle,但它会引发错误.我已经尝试了架构的所有可能配置.以下是示例.
请提示我是否遗漏了以下任何内容是我的配置文件和错误.
internal.value.converter.schemas.enable=false .
Run Code Online (Sandbox Code Playgroud)
所以我得到了
[2017-08-28 16:16:26,119] INFO Sink task WorkerSinkTask{id=oracle_sink-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:233)
[2017-08-28 16:16:26,606] INFO Discovered coordinator dfw-appblx097-01.prod.walmart.com:9092 (id: 2147483647 rack: null) for group connect-oracle_sink. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:597)
[2017-08-28 16:16:26,608] INFO Revoking previously assigned partitions [] for group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:419)
[2017-08-28 16:16:26,609] INFO (Re-)joining group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:432)
[2017-08-28 16:16:27,174] INFO Successfully joined group connect-oracle_sink with generation 26 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:399)
[2017-08-28 16:16:27,176] INFO Setting newly assigned partitions [DJ-7, DJ-6, DJ-5, DJ-4, DJ-3, DJ-2, DJ-1, DJ-0, DJ-9, …Run Code Online (Sandbox Code Playgroud) 我使用下面的代码从 teradata 读取数据但出现错误
val jdbcDF = spark.read
.format("jdbc")
.option("url",s"jdbc:teradata://${TeradataDBHost}/database=${TeradataDBDatabase}")
.option("dbtable", TeradataDBDatabase+"."+TeradataDBTable)
.option("driver","com.teradata.jdbc.TeraDriver")
.option("user", TeradataDBUsername)
.option("password", TeradataDBPassword)
.load()
Run Code Online (Sandbox Code Playgroud)
错误堆栈跟踪
val jdbcDF = spark.read
.format("jdbc")
.option("url",s"jdbc:teradata://${TeradataDBHost}/database=${TeradataDBDatabase}")
.option("dbtable", TeradataDBDatabase+"."+TeradataDBTable)
.option("driver","com.teradata.jdbc.TeraDriver")
.option("user", TeradataDBUsername)
.option("password", TeradataDBPassword)
.load()
Run Code Online (Sandbox Code Playgroud)