使用kafka流基于消息密钥将消息发送到主题

Cou*_*unt 1 java apache-kafka apache-kafka-streams

我希望能够根据消息密钥的密钥将Kafkastream中的所有记录发送到另一个主题。例如 Kafka中的流包含名称作为键和记录作为值。我想根据记录的关键将这些记录分散到不同的主题

数据:(jhon-> {jhonsRecord}),(sean-> {seansRecord}),(mary-> {marysRecord}),(jhon-> {jhonsRecord2}),预期

  • topic1:名称:jhon->(jhon-> {jhonsRecord}),(jhon-> {jhonsRecord2})
  • topic2:sean->(sean-> {seansRecord})
  • topic3:mary->(mary-> {marysRecord})

下面是我现在执行此操作的方式,但是由于名称列表比较笨拙,因此速度很慢。另外,即使有一些名字的记录,我也需要遍历整个列表。请提出修复建议

    for( String name : names )
    {
        recordsByName.filterNot(( k, v ) -> k.equalsIgnoreCase(name)).to(name);
    } 
Run Code Online (Sandbox Code Playgroud)

war*_*iak 5

我认为您应该使用KStream::to(final TopicNameExtractor<K, V> topicExtractor)功能。它使您能够计算每条消息的主题名称。

样例代码:

final KStream<String, String> stream = ???;
stream.to((key, value, recordContext) -> key);
Run Code Online (Sandbox Code Playgroud)

  • 假设与输出主题没有任何关系,我比我的回答要好得多 (2认同)