Edd*_*Edd 2 apache-kafka-streams
我有一个输入流:
KStream<String, X> inputStream = ...
我想以这样的方式操作(过滤然后聚合)GlobalKTable<String, Y>,我可以输出到 a然后我可以使用以下方式读回:
KeyValueIterator<String, Y> = streams.store("y-global-store", QueryableStoreTypes.keyValueStore()).all()
Streams DSL 可以支持这个吗?如果输出表是 a 似乎是可能的KTable,但是鉴于我在这家商店中的数据量很少,我想使用 aGlobalKTable
这是我的处理器,它将 a 转换KStream<String, X>为 aKTable<String, Y>
KTable<String, Y> outputTable = inputStream
.filter(...)
.groupByKey(Grouped.with(Serdes.String(), ySerde))
.aggregate(
initializeWithNull(),
aggregateXToAY(),
Materialized.`as`<String, Y, KeyValueStore<Bytes, ByteArray>>("y-global-store")
.withKeySerde(stringSerde)
.withValueSerde(tagRecordSerde)
)
Run Code Online (Sandbox Code Playgroud)
但是,这不会创建一个GlobalKTable,我错过了什么?
Stream DSL 不支持从 KStream 构建 GlobalKTable。似乎创建 GlobalKTable 的唯一方法是使用StreamsBuilder. globalTable("input_topic_for_globalktable")
我认为 DSL 不支持以这种方式创建 GlobalKtable 的原因是每个应用程序实例都包含整个 GlobalKTable 状态,因此默认情况下禁用日志记录(它不会将更改日志记录到更改日志主题),因此它直接使用输入主题恢复状态过程(容错),此主题必须启用日志压缩。
一种解决方案是您必须在作为outputTableKTable输出之前为此输入主题准备数据:
outputTable.toStream().to("input_topic_for_globalktable");
Run Code Online (Sandbox Code Playgroud)
或者直接使用outputTableKTable的 changelog 主题(我认为这个解决方案更好,因为您不需要为新主题额外的磁盘空间),它的名称为:
<your-application_id>-y-global-store-changelog
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
224 次 |
| 最近记录: |