Eaz*_*zyH 5 error-handling exception apache-kafka spring-cloud-stream apache-kafka-streams
我已经实现了kafka流应用程序。假设流当前正在处理的对象字段之一包含数字而不是字符串值。目前,当处理逻辑中抛出异常时,例如。.transform()
方法,整个流被终止,我的应用程序停止处理数据。
我想跳过此类无效记录并继续处理输入主题上可用的下一条记录。此外,我不想在流处理代码中实现任何 try-catch 语句。
为了实现这一点,我实现了StreamsUncaughtExceptionHandler
它返回StreamThreadExceptionResponse.REPLACE_THREAD
枚举,以便生成新线程并继续处理等待输入主题的下一条记录。然而,事实证明流消费者偏移量没有提交,当新的线程启动时,它会获取刚刚杀死前一个流线程的旧记录......由于逻辑是相同的,新线程也将无法处理错误记录并再次失败。某种循环会产生新线程,但每次都会在同一条记录上失败。
是否有任何干净的方法可以跳过失败的记录并保持流处理下一条记录?
请注意,我不是在问DeserializationExceptionHandler
或ProductionExceptionHandler
。
归档时间: |
|
查看次数: |
2130 次 |
最近记录: |