我想 ReKey 一个 GlobalKTable (可能在初始化它时,因为我相信它们只在创建后被读取)。
这可能吗?
我在 Spring/Java Kafka Streams 应用程序中处理了两个主题。第一个没有压缩,第二个是。两者都使用 Avro 作为键和值。
该应用程序从第一个(未压缩的)主题中流式传输记录,并通过 附加来自压缩主题的其他数据KStream#leftJoin。压缩的主题已作为 GlobalKTable 引入应用程序,通过创建StreamsBuilder#globalTable()并需要保持这种方式(我需要应用程序的每个实例中可用的主题的所有分区的每条记录)。
我知道有人谈论支持非主键连接(https://issues.apache.org/jira/browse/KAFKA-3705),但据我所知,我还不能这样做......
@Configuration
@EnableKafkaStreams
public class StreamsConfig {
@Autowired
private MyCustomSerdes serdes;
@Bean
public KStream<AvroKeyOne, AvroValueOne> reKeyJoin(StreamsBuilder streamsBuilder) {
GlobalKTable<AvroKeyOne, AvroValueOne> globalTable = streamsBuilder.globalTable("topicOne", Consumed.with(
serdes.getAvroKeyOne()
serdes.getAvroValueOne()
));
KStream<AvroKeyTwo, AvroValueOne> kStream = streamsBuilder.stream("topicTwo", Consumed.with(
serdes.getAvroKeyTwo(),
serdes.getAvroValueOne()
));
kStream.join(
globalTable,
/**
* the KeyValueMapper. I need to rekey the Global table as well to the
* corresponding String (which …Run Code Online (Sandbox Code Playgroud)