小编sob*_*cko的帖子

Kafka Streams 反序列化处理程序

我正在尝试在反序列化中使用 LogAndContinueExceptionHandler 。当发生错误时,通过成功记录错误并继续,它可以正常工作。但是,假设我的传入消息有连续的错误流,我停止并重新启动 kafka 流应用程序,然后我看到失败并已在上次尝试中记录的消息再次重新出现(它们正在被记录)再次)。如果我尝试将错误的消息发送到 DLQ,问题会更大。重新启动时,它们会再次发送到 DLQ。一旦我有一个良好的记录,看起来偏移量会进一步移动,并且在另一次重新启动时不会再次看到已经记录的消息。有没有办法在流应用程序中手动提交?我尝试使用 ProcessorContext#commit(),但这似乎没有任何效果。

我通过运行此处提供的示例重现了此行为:https ://github.com/confluenceinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluence/examples/streams/WordCountLambdaExample .java

我将传入值 Serde 更改为,Serdes.Integer().getClass().getName()以强制输入出现反序列化错误,并将提交间隔减少到仅 1 秒。还将以下内容添加到配置中。

streamsConfiguration.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);

一旦失败,当我重新启动应用程序时,之前失败的相同记录会再次出现在日志中。例如,每次重新启动应用程序时,我都会在控制台上看到以下输出。我希望不会再次尝试这些,因为我们之前已经跳过了它们。

2018-01-27 15:24:37,591 WARN wordcount-lambda-example-client-StreamThread-1 o.a.k.s.p.i.StreamThread:40 - Exception caught during Deserialization, taskId: 0_0, topic: words, partition: 0, offset: 113 org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4 2018-01-27 15:24:37,592 WARN wordcount-lambda-example-client-StreamThread-1 o.a.k.s.p.i.StreamThread:40 - Exception caught during Deserialization, taskId: 0_0, topic: words, partition: 0, offset: 114 org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4

看起来当反序列化异常发生时,这个标志永远不会在这里设置为true: …

apache-kafka apache-kafka-streams

7
推荐指数
0
解决办法
5312
查看次数

kafka流中的聚合和状态存储保留

我有一个像下面这样的用例.对于每个传入的事件,我想查看某个字段以查看它的状态是否从A更改为B,如果是,则将其发送到输出主题.流程是这样的:具有键"xyz"的事件带有状态A,并且一段时间之后另一个事件带有带状态B的键"xyz".我使用高级DSL获得此代码.

final KStream<String, DomainEvent> inputStream....

final KStream<String, DomainEvent> outputStream = inputStream
          .map((k, v) -> new KeyValue<>(v.getId(), v))
                    .groupByKey(Serialized.with(Serdes.String(), jsonSerde))
                    .aggregate(DomainStatusMonitor::new,
                            (k, v, aggregate) -> {
                                aggregate.updateStatusMonitor(v);
                                return aggregate;
                            }, Materialized.with(Serdes.String(), jsonSerde))
                    .toStream()
                    .filter((k, v) -> v.isStatusChangedFromAtoB())
                    .map((k,v) -> new KeyValue<>(k, v.getDomainEvent()));
Run Code Online (Sandbox Code Playgroud)

有没有更好的方法来使用DSL编写此逻辑?

关于状态存储的几个问题由上面的代码中的聚合创建.

  1. 它默认是创建内存状态存储吗?
  2. 如果我有无限数量的唯一传入密钥,会发生什么?如果它默认使用内存存储,那么我不需要切换到持久存储吗?我们如何处理DSL中的情况?
  3. 如果状态存储非常大(内存或持久性),它如何影响启动时间?如何使流处理等待以便存储完全初始化?或者Kafka Streams会确保在处理任何传入事件之前完全初始化状态存储吗?

提前致谢!

apache-kafka-streams

6
推荐指数
1
解决办法
2543
查看次数

Kafka Streams 如何分配分区?

我有一个 Kafka Streams 应用程序,它从 topic-1 asKStream和 topic-2 as接收数据KTable。两个主题各有 4 个分区。假设我有 4 个应用程序实例正在运行,那么每个实例将从主题 1 的单个分区接收数据。接收为 的 topic-2 怎么样KTable?在这种情况下,是否所有实例都会从所有 4 个分区接收数据?如果两个主题的键相同,那么我猜 Kafka Streams 将确保为应用程序分配相同的分区。如果 topic-2 没有任何键,而是应用程序将从值本身推断出它,那么这意味着所有实例都需要从 topic-2 获取所有分区。Kafka Streams 如何处理这种情况?

谢谢!

apache-kafka-streams

5
推荐指数
1
解决办法
1528
查看次数

KTable状态存储的持久性

如果在实现KTable时使用持久性存储,状态存储是否会在应用程序重启时保持持久性?例如,如果我使用以下命令:

StreamsBuilder builder = new StreamsBuilder();
KeyValueBytesStoreSupplier storeSupplier =      Stores.persistentKeyValueStore("queryable-store-name");
 KTable<Long,String> table = builder.table(
   "foo",
   Materialized.as(storeSupplier)
               .withKeySerde(Serdes.Long())
               .withValueSerde(Serdes.String())
Run Code Online (Sandbox Code Playgroud)

重新启动时,状态存储“ queryable-store-name”是否可以通过先前运行的状态进行访问?可以说,我向主题foo发送了50条记录,并且在状态存储中实现了该记录。然后重新启动应用程序,我在状态存储中还会保留那50条记录吗?如果没有,有没有办法实现?

谢谢!

apache-kafka-streams

3
推荐指数
1
解决办法
2278
查看次数

标签 统计

apache-kafka-streams ×4

apache-kafka ×1