我可以从外部方法遍历KTable中的项目

ars*_*eny 5 scala apache-kafka apache-kafka-streams

我有一个kafka主题和一个听它的KTable.

我想写一个http POST请求,它将遍历ktable中的当前项,对它们执行一些操作并回写到主题

基本上我有:

private val accessTokenTable: KTable[String, String] = builder.table(token_topic_name, tokenStoreString)
    val stream: KafkaStreams = new KafkaStreams(builder, streamingConfig)
    stream.cleanUp()
    stream.start()
Run Code Online (Sandbox Code Playgroud)

....

override def refreshTokens = {

    accessTokenTable.mapValues {
        new ValueMapper[String, String] {
            override def apply(value: String) = {
                value
            }
        }
    }.print(token_topic_name)
}
Run Code Online (Sandbox Code Playgroud)

当我尝试调用此方法时,不会打印/写入主题

我错过了什么?我唯一的选择是将消息从ktable写入hashmap并从那里读取它?它错过了ktables的全部意义?

ars*_*eny 0

经过长时间的调查,解决方案是查询其背后的存储(rocksDB)而不是表。

如此处记录的:汇合

正确的解决方案是使用 GlobalKTable 来避免“状态存储可能已迁移到另一个实例”错误,如此处所述

这段代码在 kafka 0.10.2.1 中对我有用:

    private val accessTokenTable: GlobalKTable[String, String] = builder.globalTable(token_topic_name, token_store_string)

    private val stream: KafkaStreams = new KafkaStreams(builder, streamingConfig)
    stream.cleanUp()
    stream.start()
    val store: ReadOnlyKeyValueStore[String,String] = stream.store(token_store_string,QueryableStoreTypes.keyValueStore[String,String]())
Run Code Online (Sandbox Code Playgroud)

....

    override def refreshTokensFlow = {

       store.all.asScala.map( tuple => {
       // logic goes here
           System.out.println(tuple.key + ": " + tuple.value)
       }
    }
Run Code Online (Sandbox Code Playgroud)