kafka stream-如何为KTable设置新密钥

Ste*_*cek 1 java apache-kafka apache-kafka-streams

我是Kafka Streams的新手,正在使用1.0.0版。我想从一个值中为KTable设置一个新键。

使用KStream时,可以通过使用像这样的selectKey()方法来完成。

kstream.selectKey ((k,v) -> v.newKey)
Run Code Online (Sandbox Code Playgroud)

但是,KTable中缺少这种方法。唯一的方法是将给定的KTable转换为KStream。对这个问题有什么想法吗?它改变了反对KTable设计的关键吗?

All*_*ood 8

@Matthias 的回答让我走上了正确的道路,但我认为有一段示例代码可能会有所帮助

final KTable<String, User> usersKeyedByApplicationIDKTable = usersKTable.groupBy(
        // First, going to set the new key to the user's application id
        (userId, user) -> KeyValue.pair(user.getApplicationID().toString(), user)
).aggregate(
        // Initiate the aggregate value
        () -> null,
        // adder (doing nothing, just passing the user through as the value)
        (applicationId, user, aggValue) -> user,
        // subtractor (doing nothing, just passing the user through as the value)
        (applicationId, user, aggValue) -> user
);
Run Code Online (Sandbox Code Playgroud)

KGroupedTable 聚合()文档:https ://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/KGroupedTable.html#aggregate-org.apache.kafka.streams.kstream.Initializer-org 。 apache.kafka.streams.kstream.Aggregator-org.apache.kafka.streams.kstream.Aggregator-org.apache.kafka.streams.kstream.Materialized-

  • 不过,您编写的程序是非确定性的...同样的问题也适用于 @Jackson Oliveira 方法:如果您有两个映射到同一个新键的上游记录,您不知道两者中的哪一个最终会出现在表上... (2认同)

Mat*_*Sax 7

如果要设置新键,则需要重新组合KTable:

KTable newTable = table.groupBy(/*put select key function here*/)
                       .aggregate(...);
Run Code Online (Sandbox Code Playgroud)

由于键对于KTable必须是唯一的(与KStream相反),因此需要指定一个聚合函数,该函数将具有相同(新)键的所有记录聚合为一个值。