有没有办法获取 kafka 流中消耗的每条消息的偏移量?

Sai*_*and 4 apache-kafka-streams

为了避免读取已处理但在 KAFKA STREAMS 被杀死时错过提交的消息,我想获取每条消息的偏移量以及键和值,以便我可以将其存储在某处并使用它来避免重新处理已处理的消息。

Mic*_*oll 5

是的,这是可能的。请参阅常见问题解答条目:http://docs.confluence.io/current/streams/faq.html#accessing-record-metadata-such-as-topic-partition-and-offset-information

\n\n

我将复制粘贴以下关键信息:

\n\n
\n

访问记录元数据,例如主题、分区和偏移量信息?

\n\n

记录元数据可通过处理器 API访问。\n由于其 处理器 API 集成,还可通过DSL间接访问。

\n\n

使用处理器 API,您可以通过\n 访问记录元数据 ProcessorContext。您可以在 期间将对上下文的引用存储在处理器的\n 实例字段中Processor#init(),然后\n 在 中查询处理器上下文Processor#process(),例如\n(与 相同Transformer)。上下文会自动更新以匹配当前正在处理的记录,这意味着诸如此类的方法ProcessorContext#partition()始终返回当前记录 xe2x80x99s 元数据。在 中调用处理器上下文时需要注意一些注意事项punctuate(),请参阅 Javadocs 了解详细信息。

\n\n

例如,如果您将 DSL 与自定义 结合使用Transformer,\n 您可以将输入记录\xe2\x80\x99s 值转换为还包括分区\n 和偏移元数据,然后可以利用后续的 DSL 操作(例如mapor\n) filter此信息。

\n
\n