无法检索Kafka Streams中的状态存储键的元数据

chr*_*etd 8 apache-kafka apache-kafka-streams

我正在尝试使用Kafka Streams和分布在两个实例中的状态存储.以下是商店和相关KTable的定义方式:

KTable<String, Double> userBalancesTable = kStreamBuilder.table(
        "balances-table",
        Consumed.with(String(), Double()),
        Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as(BALANCES_STORE).withKeySerde(String()).withValueSerde(Double())
);
Run Code Online (Sandbox Code Playgroud)

接下来,我有一些流处理逻辑,它将一些数据聚合到这个balance-tableKTable:

transactionsStream
        .leftJoin(...)
        ...
        .aggregate(...)
        .to("balances-table", Produced.with(String(), Double()));
Run Code Online (Sandbox Code Playgroud)

在某些时候,我是从REST处理程序查询状态存储.

ReadOnlyKeyValueStore<String, Double> balances = streams.store(BALANCES_STORE, QueryableStoreTypes.<String, Double>keyValueStore());
return Optional.ofNullable(balances.get(userId)).orElse(0.0);
Run Code Online (Sandbox Code Playgroud)

哪个工作完美 - 只要我有一个流处理实例.

现在,我正在添加第二个实例(注意:我的主题都有2个分区).如文档所述,状态存储BALANCES_STORE基于每个记录的密钥在实例之间分配(在我的示例中,密钥是用户ID).因此,一个实例必须:

  1. 调用以KafkaStreams#metadataForKey发现哪个实例正在处理包含要检索的键的状态存储的一部分

  2. 对此实例进行RPC(例如REST)调用以检索它

我的问题是调用KafkaStreams#metadataForKey始终返回一个空元数据对象.但是,KafkaStreams#allMetadataForStore()返回包含两个实例的元数据对象.所以它的行为就像它不知道我正在查询的密钥,虽然在州商店的工作中查找它.

调试

附加说明:我的application.server属性设置正确.

谢谢!