是否可以重新键入 GlobalKTable?

Cas*_*sey 6 java spring apache-kafka apache-kafka-streams spring-kafka

我想 ReKey 一个 GlobalKTable (可能在初始化它时,因为我相信它们只在创建后被读取)。

这可能吗?

我在 Spring/Java Kafka Streams 应用程序中处理了两个主题。第一个没有压缩,第二个是。两者都使用 Avro 作为键和值。

该应用程序从第一个(未压缩的)主题中流式传输记录,并通过 附加来自压缩主题的其他数据KStream#leftJoin。压缩的主题已作为 GlobalKTable 引入应用程序,通过创建StreamsBuilder#globalTable()并需要保持这种方式(我需要应用程序的每个实例中可用的主题的所有分区的每条记录)。

我知道有人谈论支持非主键连接(https://issues.apache.org/jira/browse/KAFKA-3705),但据我所知,我还不能这样做......

@Configuration
@EnableKafkaStreams
public class StreamsConfig {

  @Autowired
  private MyCustomSerdes serdes;

  @Bean
  public KStream<AvroKeyOne, AvroValueOne> reKeyJoin(StreamsBuilder streamsBuilder) {

    GlobalKTable<AvroKeyOne, AvroValueOne> globalTable = streamsBuilder.globalTable("topicOne", Consumed.with(
      serdes.getAvroKeyOne()
      serdes.getAvroValueOne()
    ));

    KStream<AvroKeyTwo, AvroValueOne> kStream = streamsBuilder.stream("topicTwo", Consumed.with(
      serdes.getAvroKeyTwo(),
      serdes.getAvroValueOne()
    ));

    kStream.join(
      globalTable,
       /**
        * the KeyValueMapper. I need to rekey the Global table as well to the
        * corresponding String (which it's data will have) if I want this join
        * to return results
        */
      (streamKey, streamValue) -> {return streamKey.getNewStringKey()},
      (/**ValueJoiner Deal**/)
    );
  }

}
Run Code Online (Sandbox Code Playgroud)

Mic*_*oll 4

我想重新设置 GlobalKTable 的密钥(可能是在初始化它时,因为我相信它们只有在创建后才会被读取)。

这可能吗?

今天还没有对此的直接支持。您已经提到了即将开展的工作,例如添加对非主键联接的全局表的支持,但这尚不可用。

今天你可以做什么:你可以将原始 Kafka 主题重新键入(重新分区)到一个新主题中,然后将重新键入的主题读取到你的全局 KTable 中。也许这是您的一个选择。

  • 重新分区可以使用普通的 KStream/KTable 来完成,这将产生一个新的、重新分区的 Kafka 主题。在这里,应用程序的实例是否只能看到“它的”键并不重要——除非重新分区本身需要全局信息(这种情况很少见)。然后,将重新分区的主题读入 GlobalKTable,这使得所有数据可供应用程序中的每个实例使用。 (2认同)