Jas*_*ter 1 change-data-capture apache-kafka apache-kafka-connect debezium
我有使用debezium更改数据捕获在mysql中捕获数据并使用kafka connect jdbc sink将其消耗给另一个mysql的问题.
因为debezium对kafka主题产生的模式和有效负载与kafka connect jdbc sink期望的模式不兼容.
当jdbc接收器想要使用数据并在另一个mysql中创建记录时,我得到异常.
我该如何解决这个问题?
Debezium生成的消息结构确实与JDBC接收器预期的消息结构不同.JDBC接收器期望消息中的每个字段对应于行中的字段,因此消息对应于行的"后"状态.OTOH,Debezium MySQL连接器执行变更数据捕获,这意味着它不仅仅包括行的最新状态.具体来说,连接器使用包含行的主键或唯一键列的键输出消息,并使用包含信封结构的消息值:
解决这种差异的最简单方法是使用Kafka 0.10.2.x(目前最新版本为0.10.2.1)和Kafka Connect的新单消息转换(SMT).每个Kafka Connect连接器都可以配置零个或多个SMT链,这些SMT可以在将消息写入Kafka之前转换源连接器的输出,或者在将Kafka作为输入传递到接收器连接器之前转换从Kafka读取的消息.SMT故意非常简单,处理单个消息,绝对不应该访问外部资源或维护任何状态,因此不能替代Kafka Streams或功能更强大的其他流处理系统,可以加入多个输入流,以及可以执行非常复杂的操作并跨多个消息维护状态.
如果您正在使用Kafka Streams进行任何类型的处理,那么您应该考虑在Kafka Streams应用程序中操作消息结构.如果没有,那么SMT是解决问题的好方法.实际上,有两种方法可以使用SMT来调整消息结构.
第一种选择是使用带有Debezium连接器的SMT来提取/保留行的"后"状态,并在将所有其他信息写入Kafka之前将其丢弃.当然,您将在Kafka主题中存储较少的信息,并丢弃一些可能在将来有价值的CDC信息.
第二个和IMO首选选项是将源连接器保持原样,并将所有CDC消息保留在Kafka主题中,但是然后使用带有接收器连接器的SMT来提取/保留行的"后"状态在将消息传递到JDBC接收器连接器之前丢弃所有其他信息.您可以使用Kafka Connect中包含的现有SMT之一,但您可以考虑编写自己的SMT来完全按照自己的意愿行事.
| 归档时间: |
|
| 查看次数: |
1030 次 |
| 最近记录: |