EGR*_*GRA 5 java apache-kafka-streams
使用 Kafka Stream,我总是使用以下代码从参考紧凑主题初始化我的商店:
builder.globalTable(kafkaTopic, Materialized.as("storeMerchant"));
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
Run Code Online (Sandbox Code Playgroud)
我想kafkaTopic在开店之前过滤一下话题,以剔除一些不必要的商家。
像这样的东西:
GlobalKTable<String, MerchantAvro> merchant$ = builder.globalTable(kafkaTopic);
merchant$.filter((key, value) -> !Optional.ofNullable(value)
.map(MerchantAvro::getDeletionDate)
.isPresent());
...
Run Code Online (Sandbox Code Playgroud)
但是不可能filter在GlobalKTable.
我怎样才能进行这种过滤?