px5*_*5x2 3 java apache-kafka apache-kafka-streams
我有一个简单的流应用程序将一个主题作为输入流并将 KeyValues 转换为另一个主题,例如:
StoreBuilder<KeyValueStore<Long, CategoryDto>> builder =
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(CategoryTransformer.STORE_NAME),
Serdes.Long(), CATEGORY_JSON_SERDE);
streamsBuilder.addStateStore(builder)
.stream(categoryTopic, Consumed.with(Serdes.Long(), CATEGORY_JSON_SERDE))
.transform(CategoryTransformer::new, CategoryTransformer.STORE_NAME);
static class CategoryTransformer implements Transformer<Long, CategoryDto, KeyValue<Long, CategoryDto>> {
static final String STORE_NAME = "test-store";
private KeyValueStore<Long, CategoryDto> store;
@Override
public void init(ProcessorContext context) {
store = (KeyValueStore<Long, CategoryDto>) context.getStateStore(STORE_NAME);
}
@Override
public KeyValue<Long, CategoryDto> transform(Long key, CategoryDto value) {
store.put(key, value);
return KeyValue.pair(key, value);
}
@Override
public KeyValue<Long, CategoryDto> punctuate(long timestamp) {
return null;
}
@Override
public void close() {
}
}
Run Code Online (Sandbox Code Playgroud)
在这里我不得不使用转换器,因为我需要获取存储和更新相关值。
问题是使用本地状态存储和仅将值放入一个简单的HashMap内部有ForeachAction什么区别?
在这种情况下使用本地状态存储有什么好处?
虽然它没有显示在您的代码中,但我假设您以某种方式阅读并使用了存储状态。
使用简单的(在内存中)存储您的状态HashMap会使您的状态根本不持久,这意味着当发生以下任一情况时,您的状态将丢失(这些都不是异常的,假设它会经常发生):
非持久状态的问题在于,当上述任何一种情况发生时,kafka-streams 将在最后提交的偏移量处重新开始处理。因此在崩溃/停止/重新平衡之前处理的所有记录都不会被重新处理,这意味着HashMap当处理重新开始时您的内容将为空。这当然不是你想要的。
另一方面,如果您使用提供的状态存储之一,kafka-streams 将确保,一旦处理在上面列出的任何中断后重新启动,状态将可用,就好像处理从未停止过一样,无需重新处理任何之前处理的记录。
| 归档时间: |
|
| 查看次数: |
3787 次 |
| 最近记录: |