Ano*_*ssi 6 apache-kafka-streams
我正在编写一个基本应用程序来测试Kafka Streams的Interactive Queries功能.这是代码:
public static void main(String [] args){
StreamsBuilder builder = new StreamsBuilder();
KeyValueBytesStoreSupplier waypointsStoreSupplier = Stores.persistentKeyValueStore("test-store");
StoreBuilder waypointsStoreBuilder = Stores.keyValueStoreBuilder(waypointsStoreSupplier, Serdes.ByteArray(), Serdes.Integer());
final KStream<byte[], byte[]> waypointsStream = builder.stream("sample1");
final KStream<byte[], TruckDriverWaypoint> waypointsDeserialized = waypointsStream
.mapValues(CustomSerdes::deserializeTruckDriverWaypoint)
.filter((k,v) -> v.isPresent())
.mapValues(Optional::get);
waypointsDeserialized.groupByKey().aggregate(
() -> 1,
(aggKey, newWaypoint, aggValue) -> {
aggValue = aggValue + 1;
return aggValue;
}, Materialized.<byte[], Integer, KeyValueStore<Bytes, byte[]>>as("test-store").withKeySerde(Serdes.ByteArray()).withValueSerde(Serdes.Integer())
);
final KafkaStreams streams = new KafkaStreams(builder.build(), new StreamsConfig(createStreamsProperties()));
streams.cleanUp();
streams.start();
ReadOnlyKeyValueStore<byte[], Integer> keyValueStore = streams.store("test-store", QueryableStoreTypes.keyValueStore());
KeyValueIterator<byte[], Integer> range = keyValueStore.all();
while (range.hasNext()) {
KeyValue<byte[], Integer> next = range.next();
System.out.println(next.value);
}
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
protected static Properties createStreamsProperties() {
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "random167");
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "client-id");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Serdes.Integer().getClass().getName());
//streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000);
return streamsConfiguration;
}
Run Code Online (Sandbox Code Playgroud)
所以我的问题是,每次运行时我都会遇到同样的错误:
Exception in thread "main" org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, test-store, may have migrated to another instance.
Run Code Online (Sandbox Code Playgroud)
我只运行了一个应用程序实例,而我正在使用的主题只有一个分区.
知道我做错了什么吗?
Kyl*_*ham 10
看起来你有竞争条件.从kafka流javadoc为KafkaStreams::start()它说:
通过启动所有线程来启动KafkaStreams实例.期望在客户端的生命周期中仅调用此函数一次.因为线程是在后台启动的,所以此方法不会阻止.
https://kafka.apache.org/10/javadoc/index.html?org/apache/kafka/streams/KafkaStreams.html
你streams.store()之后立即打电话streams.start(),但我打赌你处于尚未完全初始化的状态.
因为代码似乎只是用于测试,所以Thread.sleep(5000)在那里添加一个或者什么东西然后去试试吧.(这不是生产的解决方案)根据您对主题的输入率,这可能会给商店一些时间开始填充事件,以便您KeyValueIterator实际上有一些东西可以处理/打印.
| 归档时间: |
|
| 查看次数: |
3431 次 |
| 最近记录: |