小编Dig*_*ela的帖子

Kafka Connect - 无法更改添加缺失字段 SinkRecordField{schema=Schema{BYTES}, name='CreateUID', isPrimaryKey=true},

我正在使用 JDBC 源连接器从 Teradata 表读取数据并推送到 Kafka 主题。但是当我尝试使用 JDBC 接收器连接器读取 Kafka 主题并推送到 Oracle 表时,它会抛出以下错误。我确信错误是由于参数造成的pk.mode,并且pk.fields我不确定要使用什么。

我的 terradata 有一个主键 UserID+ DatabaseID 。我在 Oracle 中创建了表,主键为 Userid+databaseID。

** ERROR Cannot ALTER to add missing field SinkRecordField{schema=Schema{BYTES}, 
name='CreateUID', isPrimaryKey=true}, as it is not 
optional and does not have a default value** 
Run Code Online (Sandbox Code Playgroud)

下面是我的水槽连接器-

{name=teradata_sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=TERADATA_ACCESSRIGHTS
connection.url=
connection.user=
connection.password=
auto.create=false
table.name.format=TERADATA_ACCESSRIGHTS
pk.mode=record_value
pk.fields=USERID+DATABASEID
auto.evolve=true
insert.mode=upsert
}
Run Code Online (Sandbox Code Playgroud)

请建议我如何使用具有给定主键的 JDBC 接收器连接器。

apache-kafka apache-kafka-connect

5
推荐指数
1
解决办法
3898
查看次数

Kafka Connect JDBC接收器连接器无法正常工作

我正在尝试使用Kafka Connect JDBC sink连接器将数据插入Oracle,但它会引发错误.我已经尝试了架构的所有可能配置.以下是示例.

请提示我是否遗漏了以下任何内容是我的配置文件和错误.

案例1-第一配置

internal.value.converter.schemas.enable=false .
Run Code Online (Sandbox Code Playgroud)

所以我得到了

[2017-08-28 16:16:26,119] INFO Sink task WorkerSinkTask{id=oracle_sink-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:233)

[2017-08-28 16:16:26,606] INFO Discovered coordinator dfw-appblx097-01.prod.walmart.com:9092 (id: 2147483647 rack: null) for group connect-oracle_sink. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:597)

[2017-08-28 16:16:26,608] INFO Revoking previously assigned partitions [] for group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:419)

[2017-08-28 16:16:26,609] INFO (Re-)joining group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:432)

[2017-08-28 16:16:27,174] INFO Successfully joined group connect-oracle_sink with generation 26 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:399)

[2017-08-28 16:16:27,176] INFO Setting newly assigned partitions [DJ-7, DJ-6, DJ-5, DJ-4, DJ-3, DJ-2, DJ-1, DJ-0, DJ-9, …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-consumer-api apache-kafka-connect

4
推荐指数
1
解决办法
6903
查看次数

使用 Spark 从 teradata 表读取数据时出现 ExceptionInInitializer 错误

我使用下面的代码从 teradata 读取数据但出现错误

val jdbcDF = spark.read
  .format("jdbc")
  .option("url",s"jdbc:teradata://${TeradataDBHost}/database=${TeradataDBDatabase}")
  .option("dbtable", TeradataDBDatabase+"."+TeradataDBTable)
  .option("driver","com.teradata.jdbc.TeraDriver")
  .option("user", TeradataDBUsername)
  .option("password", TeradataDBPassword)
  .load()
Run Code Online (Sandbox Code Playgroud)

错误堆栈跟踪

val jdbcDF = spark.read
  .format("jdbc")
  .option("url",s"jdbc:teradata://${TeradataDBHost}/database=${TeradataDBDatabase}")
  .option("dbtable", TeradataDBDatabase+"."+TeradataDBTable)
  .option("driver","com.teradata.jdbc.TeraDriver")
  .option("user", TeradataDBUsername)
  .option("password", TeradataDBPassword)
  .load()
Run Code Online (Sandbox Code Playgroud)

jdbc teradata apache-spark apache-spark-sql

0
推荐指数
1
解决办法
1638
查看次数