我有一个kafka主题和一个听它的KTable.
我想写一个http POST请求,它将遍历ktable中的当前项,对它们执行一些操作并回写到主题
基本上我有:
private val accessTokenTable: KTable[String, String] = builder.table(token_topic_name, tokenStoreString)
val stream: KafkaStreams = new KafkaStreams(builder, streamingConfig)
stream.cleanUp()
stream.start()
Run Code Online (Sandbox Code Playgroud)
....
override def refreshTokens = {
accessTokenTable.mapValues {
new ValueMapper[String, String] {
override def apply(value: String) = {
value
}
}
}.print(token_topic_name)
}
Run Code Online (Sandbox Code Playgroud)
当我尝试调用此方法时,不会打印/写入主题
我错过了什么?我唯一的选择是将消息从ktable写入hashmap并从那里读取它?它错过了ktables的全部意义?