如何使用debezium更改数据捕获在mysql中捕获数据并使用kafka connect中的jdbc接收器消耗?

Jas*_*ter 1 change-data-capture apache-kafka apache-kafka-connect debezium

我有使用debezium更改数据捕获在mysql中捕获数据并使用kafka connect jdbc sink将其消耗给另一个mysql的问题.

因为debezium对kafka主题产生的模式和有效负载与kafka connect jdbc sink期望的模式不兼容.

当jdbc接收器想要使用数据并在另一个mysql中创建记录时,我得到异常.

我该如何解决这个问题?

Ran*_*uch 7

Debezium生成的消息结构确实与JDBC接收器预期的消息结构不同.JDBC接收器期望消息中的每个字段对应于行中的字段,因此消息对应于行的"后"状态.OTOH,Debezium MySQL连接器执行变更数据捕获,这意味着它不仅仅包括行的最新状态.具体来说,连接器使用包含行的主键或唯一键列的键输出消息,并使用包含信封结构的消息值:

  • 操作,例如是插入,更新还是删除
  • 发生更改之前的行的状态(插入时为null)
  • 发生更改的行的状态(删除时为null)
  • 特定于源的信息,包括服务器元数据,事务ID,数据库和表名,事件发生时的服务器时间戳以及有关发现事件的位置的详细信息等.
  • 连接器生成事件的时间戳

解决这种差异的最简单方法是使用Kafka 0.10.2.x(目前最新版本为0.10.2.1)和Kafka Connect的新单消息转换(SMT).每个Kafka Connect连接器都可以配置零个或多个SMT链,这些SMT可以在将消息写入Kafka之前转换源连接器的输出,或者在将Kafka作为输入传递到接收器连接器之前转换从Kafka读取的消息.SMT故意非常简单,处理单个消息,绝对不应该访问外部资源或维护任何状态,因此不能替代Kafka Streams或功能更强大的其他流处理系统,可以加入多个输入流,以及可以执行非常复杂的操作并跨多个消息维护状态.

如果您正在使用Kafka Streams进行任何类型的处理,那么您应该考虑在Kafka Streams应用程序中操作消息结构.如果没有,那么SMT是解决问题的好方法.实际上,有两种方法可以使用SMT来调整消息结构.

第一种选择是使用带有Debezium连接器的SMT来提取/保留行的"后"状态,并在将所有其他信息写入Kafka之前将其丢弃.当然,您将在Kafka主题中存储较少的信息,并丢弃一些可能在将来有价值的CDC信息.

第二个和IMO首选选项是将源连接器保持原样,并将所有CDC消息保留在Kafka主题中,但是然后使用带有接收器连接器的SMT来提取/保留行的"后"状态在将消息传递到JDBC接收器连接器之前丢弃所有其他信息.您可以使用Kafka Connect中包含的现有SMT之一,但您可以考虑编写自己的SMT来完全按照自己的意愿行事.