我正在尝试在反序列化中使用 LogAndContinueExceptionHandler 。当发生错误时,通过成功记录错误并继续,它可以正常工作。但是,假设我的传入消息有连续的错误流,我停止并重新启动 kafka 流应用程序,然后我看到失败并已在上次尝试中记录的消息再次重新出现(它们正在被记录)再次)。如果我尝试将错误的消息发送到 DLQ,问题会更大。重新启动时,它们会再次发送到 DLQ。一旦我有一个良好的记录,看起来偏移量会进一步移动,并且在另一次重新启动时不会再次看到已经记录的消息。有没有办法在流应用程序中手动提交?我尝试使用 ProcessorContext#commit(),但这似乎没有任何效果。
我通过运行此处提供的示例重现了此行为:https ://github.com/confluenceinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluence/examples/streams/WordCountLambdaExample .java
我将传入值 Serde 更改为,Serdes.Integer().getClass().getName()以强制输入出现反序列化错误,并将提交间隔减少到仅 1 秒。还将以下内容添加到配置中。
streamsConfiguration.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);。
一旦失败,当我重新启动应用程序时,之前失败的相同记录会再次出现在日志中。例如,每次重新启动应用程序时,我都会在控制台上看到以下输出。我希望不会再次尝试这些,因为我们之前已经跳过了它们。
2018-01-27 15:24:37,591 WARN wordcount-lambda-example-client-StreamThread-1 o.a.k.s.p.i.StreamThread:40 - Exception caught during Deserialization, taskId: 0_0, topic: words, partition: 0, offset: 113
org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4
2018-01-27 15:24:37,592 WARN wordcount-lambda-example-client-StreamThread-1 o.a.k.s.p.i.StreamThread:40 - Exception caught during Deserialization, taskId: 0_0, topic: words, partition: 0, offset: 114
org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4
看起来当反序列化异常发生时,这个标志永远不会在这里设置为true: …
我有一个像下面这样的用例.对于每个传入的事件,我想查看某个字段以查看它的状态是否从A更改为B,如果是,则将其发送到输出主题.流程是这样的:具有键"xyz"的事件带有状态A,并且一段时间之后另一个事件带有带状态B的键"xyz".我使用高级DSL获得此代码.
final KStream<String, DomainEvent> inputStream....
final KStream<String, DomainEvent> outputStream = inputStream
.map((k, v) -> new KeyValue<>(v.getId(), v))
.groupByKey(Serialized.with(Serdes.String(), jsonSerde))
.aggregate(DomainStatusMonitor::new,
(k, v, aggregate) -> {
aggregate.updateStatusMonitor(v);
return aggregate;
}, Materialized.with(Serdes.String(), jsonSerde))
.toStream()
.filter((k, v) -> v.isStatusChangedFromAtoB())
.map((k,v) -> new KeyValue<>(k, v.getDomainEvent()));
Run Code Online (Sandbox Code Playgroud)
有没有更好的方法来使用DSL编写此逻辑?
关于状态存储的几个问题由上面的代码中的聚合创建.
提前致谢!
我有一个 Kafka Streams 应用程序,它从 topic-1 asKStream和 topic-2 as接收数据KTable。两个主题各有 4 个分区。假设我有 4 个应用程序实例正在运行,那么每个实例将从主题 1 的单个分区接收数据。接收为 的 topic-2 怎么样KTable?在这种情况下,是否所有实例都会从所有 4 个分区接收数据?如果两个主题的键相同,那么我猜 Kafka Streams 将确保为应用程序分配相同的分区。如果 topic-2 没有任何键,而是应用程序将从值本身推断出它,那么这意味着所有实例都需要从 topic-2 获取所有分区。Kafka Streams 如何处理这种情况?
谢谢!
如果在实现KTable时使用持久性存储,状态存储是否会在应用程序重启时保持持久性?例如,如果我使用以下命令:
StreamsBuilder builder = new StreamsBuilder();
KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore("queryable-store-name");
KTable<Long,String> table = builder.table(
"foo",
Materialized.as(storeSupplier)
.withKeySerde(Serdes.Long())
.withValueSerde(Serdes.String())
Run Code Online (Sandbox Code Playgroud)
重新启动时,状态存储“ queryable-store-name”是否可以通过先前运行的状态进行访问?可以说,我向主题foo发送了50条记录,并且在状态存储中实现了该记录。然后重新启动应用程序,我在状态存储中还会保留那50条记录吗?如果没有,有没有办法实现?
谢谢!