ars*_*eny 5 scala apache-kafka apache-kafka-streams
我有一个kafka主题和一个听它的KTable.
我想写一个http POST请求,它将遍历ktable中的当前项,对它们执行一些操作并回写到主题
基本上我有:
private val accessTokenTable: KTable[String, String] = builder.table(token_topic_name, tokenStoreString)
val stream: KafkaStreams = new KafkaStreams(builder, streamingConfig)
stream.cleanUp()
stream.start()
Run Code Online (Sandbox Code Playgroud)
....
override def refreshTokens = {
accessTokenTable.mapValues {
new ValueMapper[String, String] {
override def apply(value: String) = {
value
}
}
}.print(token_topic_name)
}
Run Code Online (Sandbox Code Playgroud)
当我尝试调用此方法时,不会打印/写入主题
我错过了什么?我唯一的选择是将消息从ktable写入hashmap并从那里读取它?它错过了ktables的全部意义?
经过长时间的调查,解决方案是查询其背后的存储(rocksDB)而不是表。
如此处记录的:汇合
正确的解决方案是使用 GlobalKTable 来避免“状态存储可能已迁移到另一个实例”错误,如此处所述。
这段代码在 kafka 0.10.2.1 中对我有用:
private val accessTokenTable: GlobalKTable[String, String] = builder.globalTable(token_topic_name, token_store_string)
private val stream: KafkaStreams = new KafkaStreams(builder, streamingConfig)
stream.cleanUp()
stream.start()
val store: ReadOnlyKeyValueStore[String,String] = stream.store(token_store_string,QueryableStoreTypes.keyValueStore[String,String]())
Run Code Online (Sandbox Code Playgroud)
....
override def refreshTokensFlow = {
store.all.asScala.map( tuple => {
// logic goes here
System.out.println(tuple.key + ": " + tuple.value)
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
202 次 |
| 最近记录: |