Mar*_*vin 1 apache-kafka-streams
我正在尝试实现一个Transformer类
public class StreamSorterByTimeStampWithDelayTransformer < V >
implements Transformer< Long, V, KeyValue< Long, V > >
Run Code Online (Sandbox Code Playgroud)
该类的构造函数为每个实例创建一个StateStore,因此:
this.state_store_name = "state_store_" + this.hashCode();
KeyValueBytesStoreSupplier key_value_store_supplier = Stores.persistentKeyValueStore( state_store_name );
StoreBuilder< KeyValueStore< String, V > > key_value_store_builder =
Stores.keyValueStoreBuilder( key_value_store_supplier, Serdes.String(), v_instance.getSerde() );
stream_builder.addStateStore( key_value_store_builder );
Run Code Online (Sandbox Code Playgroud)
Transformer init方法引用StateStore:
public void init(ProcessorContext context) {
this.context = context;
this.key_value_store = (KeyValueStore< String, V >) context.getStateStore( state_store_name );
// schedule a punctuate() method every "poll_ms" (wall clock time)
this.context.schedule( this.poll_ms, PunctuationType.WALL_CLOCK_TIME,
(timestamp) -> pushOutOldEnoughEntries() );
}
Run Code Online (Sandbox Code Playgroud)
我想我必须省略一步,因为当调用getStateStore时,它会导致异常,说:
Processor KSTREAM-TRANSFORM-0000000003 has no access to StateStore
Run Code Online (Sandbox Code Playgroud)
我在省略或做错了什么?
您需要将商店连接到变压器:
stream.transform(..., this.state_store_name);
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
716 次 |
| 最近记录: |