Kafka JDBC 连接器加载所有数据,然后增量

mik*_*010 7 elasticsearch apache-kafka apache-kafka-connect confluent-platform

我试图弄清楚如何最初从查询中获取所有数据,然后仅使用 kafka 连接器增量更改。这样做的原因是我想将所有数据加载到弹性搜索中,然后使 es 与我的 kafka 流同步。目前,我首先使用带有模式 = 批量的连接器来执行此操作,然后将其更改为时间戳。这工作正常。

但是,如果我们想将所有数据重新加载到 Streams 和 ES,这意味着我们必须编写一些脚本来以某种方式清理或删除 kafka 流和 es 索引数据,修改连接 ini 以将模式设置为批量,重新启动所有内容,给出是时候加载所有数据,然后再次将脚本修改为时间戳模式,然后再次重新启动所有内容(需要这样一个脚本的原因是偶尔,批量更新会通过我们尚无法控制的 etl 过程来纠正历史数据,并且此过程不会更新时间戳)

有没有人做类似的事情并找到了更优雅的解决方案?

mik*_*010 1

过了很长一段时间才回到这个话题。该方法能够解决这个问题并且永远不必使用批量模式

  1. 停止连接器
  2. 擦除每个连接器 jvm 的偏移文件
  3. (可选)如果您想要执行完整的擦除和加载,您可能还想使用 kafka/connect utils/rest api 删除您的主题(并且不要忘记状态主题)
  4. 重新启动连接。