逻辑删除消息是否无法从KTable状态存储中删除记录?

R K*_*R K 3 spring-cloud-stream apache-kafka-streams spring-kafka

我正在创建从KStream处理数据的KTable。但是,当我使用键和有效负载为空的逻辑删除消息时,它并没有从KTable中删除消息。

样本-

public KStream<String, GenericRecord> processRecord(@Input(Channel.TEST) KStream<GenericRecord, GenericRecord> testStream,
KTable<String, GenericRecord> table = testStream
                .map((genericRecord, genericRecord2) -> KeyValue.pair(genericRecord.get("field1") + "", genericRecord2))
                .groupByKey()
                reduce((genericRecord, v1) -> v1, Materialized.as("test-store"));


GenericRecord genericRecord = new GenericData.Record(getAvroSchema(keySchema));
genericRecord.put("field1", Long.parseLong(test.getField1()));
ProducerRecord record = new ProducerRecord(Channel.TEST, genericRecord, null);
kafkaTemplate.send(record);
Run Code Online (Sandbox Code Playgroud)

触发带有空值的消息后,我可以使用有效负载为空的testStream映射函数进行调试,但是它不会删除KTable更改日志“测试存储”上的记录。看起来它甚至没有达到reduce方法,不确定我在这里缺少什么。

感谢任何帮助!

谢谢。

Mat*_*Sax 5

如JavaDocs中所述 reduce()

使用{@code null}键或值的记录将被忽略。

因为该<key,null>记录已删除,因此(genericRecord, v1) -> v1从不执行,所以不会将逻辑删除写入存储或更改日志主题。

对于您所想到的用例,您需要使用一个表示“删除”的替代值,例如,Avro记录中的布尔标志。您的reduce函数需要检查标志并返回null是否设置了标志;否则,必须定期处理记录。