Ste*_*cek 1 java apache-kafka apache-kafka-streams
我是Kafka Streams的新手,正在使用1.0.0版。我想从一个值中为KTable设置一个新键。
使用KStream时,可以通过使用像这样的selectKey()方法来完成。
kstream.selectKey ((k,v) -> v.newKey)
Run Code Online (Sandbox Code Playgroud)
但是,KTable中缺少这种方法。唯一的方法是将给定的KTable转换为KStream。对这个问题有什么想法吗?它改变了反对KTable设计的关键吗?
@Matthias 的回答让我走上了正确的道路,但我认为有一段示例代码可能会有所帮助
final KTable<String, User> usersKeyedByApplicationIDKTable = usersKTable.groupBy(
// First, going to set the new key to the user's application id
(userId, user) -> KeyValue.pair(user.getApplicationID().toString(), user)
).aggregate(
// Initiate the aggregate value
() -> null,
// adder (doing nothing, just passing the user through as the value)
(applicationId, user, aggValue) -> user,
// subtractor (doing nothing, just passing the user through as the value)
(applicationId, user, aggValue) -> user
);
Run Code Online (Sandbox Code Playgroud)
KGroupedTable 聚合()文档:https ://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/KGroupedTable.html#aggregate-org.apache.kafka.streams.kstream.Initializer-org 。 apache.kafka.streams.kstream.Aggregator-org.apache.kafka.streams.kstream.Aggregator-org.apache.kafka.streams.kstream.Materialized-
如果要设置新键,则需要重新组合KTable:
KTable newTable = table.groupBy(/*put select key function here*/)
.aggregate(...);
Run Code Online (Sandbox Code Playgroud)
由于键对于KTable必须是唯一的(与KStream相反),因此需要指定一个聚合函数,该函数将具有相同(新)键的所有记录聚合为一个值。