小编Ano*_*ssi的帖子

Kafka Streams - 状态存储可能已迁移到另一个实例

我正在编写一个基本应用程序来测试Kafka Streams的Interactive Queries功能.这是代码:

public static void main(String [] args){

    StreamsBuilder builder = new StreamsBuilder();

    KeyValueBytesStoreSupplier waypointsStoreSupplier = Stores.persistentKeyValueStore("test-store");
    StoreBuilder waypointsStoreBuilder = Stores.keyValueStoreBuilder(waypointsStoreSupplier, Serdes.ByteArray(), Serdes.Integer());

    final KStream<byte[], byte[]> waypointsStream = builder.stream("sample1");

    final KStream<byte[], TruckDriverWaypoint> waypointsDeserialized =  waypointsStream
                                                                        .mapValues(CustomSerdes::deserializeTruckDriverWaypoint)
                                                                        .filter((k,v) -> v.isPresent())
                                                                        .mapValues(Optional::get);

    waypointsDeserialized.groupByKey().aggregate(
            () -> 1,
            (aggKey, newWaypoint, aggValue) -> {

                aggValue = aggValue + 1;
                return aggValue;

            }, Materialized.<byte[], Integer, KeyValueStore<Bytes, byte[]>>as("test-store").withKeySerde(Serdes.ByteArray()).withValueSerde(Serdes.Integer())
    );

    final KafkaStreams streams = new KafkaStreams(builder.build(), new StreamsConfig(createStreamsProperties()));

    streams.cleanUp();
    streams.start();    

    ReadOnlyKeyValueStore<byte[], Integer> keyValueStore = streams.store("test-store", QueryableStoreTypes.keyValueStore());

    KeyValueIterator<byte[], …
Run Code Online (Sandbox Code Playgroud)

apache-kafka-streams

6
推荐指数
1
解决办法
3431
查看次数

标签 统计

apache-kafka-streams ×1