小编ars*_*eny的帖子

我可以从外部方法遍历KTable中的项目

我有一个kafka主题和一个听它的KTable.

我想写一个http POST请求,它将遍历ktable中的当前项,对它们执行一些操作并回写到主题

基本上我有:

private val accessTokenTable: KTable[String, String] = builder.table(token_topic_name, tokenStoreString)
    val stream: KafkaStreams = new KafkaStreams(builder, streamingConfig)
    stream.cleanUp()
    stream.start()
Run Code Online (Sandbox Code Playgroud)

....

override def refreshTokens = {

    accessTokenTable.mapValues {
        new ValueMapper[String, String] {
            override def apply(value: String) = {
                value
            }
        }
    }.print(token_topic_name)
}
Run Code Online (Sandbox Code Playgroud)

当我尝试调用此方法时,不会打印/写入主题

我错过了什么?我唯一的选择是将消息从ktable写入hashmap并从那里读取它?它错过了ktables的全部意义?

scala apache-kafka apache-kafka-streams

5
推荐指数
1
解决办法
202
查看次数

标签 统计

apache-kafka ×1

apache-kafka-streams ×1

scala ×1