如何在实现 GlobalKTable 之前过滤掉不需要的记录?

EGR*_*GRA 5 java apache-kafka-streams

使用 Kafka Stream,我总是使用以下代码从参考紧凑主题初始化我的商店:

builder.globalTable(kafkaTopic, Materialized.as("storeMerchant"));
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
Run Code Online (Sandbox Code Playgroud)

我想kafkaTopic在开店之前过滤一下话题,以剔除一些不必要的商家。

像这样的东西:

GlobalKTable<String, MerchantAvro> merchant$ = builder.globalTable(kafkaTopic);
merchant$.filter((key, value) -> !Optional.ofNullable(value)
         .map(MerchantAvro::getDeletionDate)
         .isPresent());
...
Run Code Online (Sandbox Code Playgroud)

但是不可能filterGlobalKTable.

我怎样才能进行这种过滤?

Mat*_*Sax 3

您需要先过滤主题并将结果放入另一个主题。然后,您可以将第二个主题用作GlobalKTable

作为替代方案,您也许可以使用“全局商店”而不是GlobalKTable. 对于这种情况,您可以提供一个自定义Processor,该自定义可以在填充全局存储之前实现过滤器。请参阅定义流处理器

全球商店也是本地商店。不同之处在于,对于“常规存储”,数据是分区的,即每个存储包含不同的数据,而对于全局存储,每个实例加载所有数据(即数据被复制)。因此,该组的每个成员都有自己的全局存储数据副本。