如何获取 Kafka 流中当前的 Kafka 主题?

Veh*_*kij 5 apache-kafka apache-kafka-streams

我的场景是我使用创建大量共享前缀(例如 house.door、house.room )的 Kafka 主题,并使用 Kafka 流正则表达式主题模式 API 消耗所有主题。一切看起来都很好,我得到了数据的密钥和消息。

为了处理数据,我需要主题名称,以便我可以根据主题名称进行连接,但我不知道如何在 Kafka 流 DSL 中获取主题名称。

解决我的问题的一种可能方法是将主题名称与我的消息一起保存。不过如果能直接获取主题名称就更好了。

那么,如何获取 Kafka 流中当前的 Kafka 主题?

Mat*_*Sax 1

常见问题解答: https: //docs.confluence.io/current/streams/faq.html#accessing-record-metadata-such-as-topic-partition-and-offset-information

\n\n
\n

记录元数据可通过处理器 API 访问。由于其处理器 API 集成,还可以通过 DSL 间接访问它。

\n\n

使用 Processor API,您可以通过 ProcessorContext 访问记录元数据。例如,您可以在 Processor#init() 期间将对上下文的引用存储在处理器的实例字段中,然后在 Processor#process() 中查询处理器上下文(对于 Transformer 也是如此)。上下文会自动更新以匹配当前正在处理的记录,这意味着 ProcessorContext#partition() 等方法始终返回当前记录\xe2\x80\x99s 元数据。在计划的 punctuate() 函数中调用处理器上下文时需要注意一些注意事项,有关详细信息,请参阅 Javadocs。

\n\n

例如,如果将 DSL 与自定义 Transformer 结合使用,则可以将输入 record\xe2\x80\x99s 值转换为还包括分区和偏移元数据,并且后续 DSL 操作(例如映射或过滤器)可以利用此信息。

\n
\n