use*_*180 4 apache-kafka apache-kafka-streams
我在执行 kstreams 以获取聚合计数时收到此错误。
Exception in thread "KStreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: task [1_2] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000002
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:220)
at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:204)
at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:491)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:431)
at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:346)
at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:405)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1029)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:883)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4
Run Code Online (Sandbox Code Playgroud)
这是我正在执行的代码
final KStream<String, EventsAvro> stream = builder.stream("events_topic");
KStream<Integer, Long> events = stream.map((k, v) -> new KeyValue<Integer, Long>(v.getPageId(), v.getUserId()));
KGroupedStream<Integer, Long> groupedStream = events.groupByKey(Grouped.with(Serdes.Integer(), Serdes.Long()));
KTable<Windowed<Integer>, Long> windowedCount = groupedStream
.windowedBy(TimeWindows.of(Duration.ofMinutes(10)).grace(ofMillis(5L)))
.count()
.suppress(Suppressed.untilWindowCloses(unbounded()));
windowedCount.toStream()
.map((key, value) -> new KeyValue<>(key.key().intValue(),value.longValue()))
.to("test_topic",Produced.with(Serdes.Integer(),Serdes.Long()));
Run Code Online (Sandbox Code Playgroud)
在我添加此抑制代码之前,它曾经运行良好。有什么想法吗?
我认为这对于 serdes 来说不是问题count()。
如果您不通过,则使用Materialized您调用的对象中的 serdes 。count()那串看起来的 Serdes 一直到你传递最后一个 Serdes 的方法。就你而言,是的.groupByKey(Grouped.with(Serdes.Integer(), Serdes.Long()))。Serdes 不是问题,因为count()和suppress(...)将用于 keySerdes.Integer()和 value Serdes.Long())。
我尝试重现您的异常,只有当我更改了消息中的键类型和由函数(分组键类型)Serdes处理并重新启动应用程序时,我才能做到这一点。当 KafkaStreams 在提交期间尝试刷新数据时会引发异常。suppress
我如何复制它:
首先由生产者生成几条消息并运行以下代码。密钥类型很重要(长)
final KStream<String, EventsAvro> stream = builder.stream("events_topic");
KStream<Long, Long> events = stream.map((k, v) -> new KeyValue<Long, Long>((long) v.getPageId(), v.getUserId()));
KGroupedStream<Long, Long> groupedStream = events.groupByKey(Grouped.with(Serdes.Long(), Serdes.Long()));
KTable<Windowed<Long>, Long> windowedCount = groupedStream
.windowedBy(TimeWindows.of(Duration.ofMinutes(10)).grace(ofMillis(5L)))
.count()
.suppress(Suppressed.untilWindowCloses(unbounded()));
windowedCount.toStream()
.map((key, value) -> new KeyValue<>(key.key().longValue(),value.longValue()))
.to("test_topic",Produced.with(Serdes.Long(),Serdes.Long()));
Run Code Online (Sandbox Code Playgroud)
1-2 分钟后,停止应用程序并将修改恢复为原始代码:密钥类型很重要(整数)
final KStream<String, EventsAvro> stream = builder.stream("events_topic");
KStream<Integer, Long> events = stream.map((k, v) -> new KeyValue<Integer, Long>(v.getPageId(), v.getUserId()));
KGroupedStream<Integer, Long> groupedStream = events.groupByKey(Grouped.with(Serdes.Integer(), Serdes.Long()));
KTable<Windowed<Integer>, Long> windowedCount = groupedStream
.windowedBy(TimeWindows.of(Duration.ofMinutes(10)).grace(ofMillis(5L)))
.count()
.suppress(Suppressed.untilWindowCloses(unbounded()));
windowedCount.toStream()
.map((key, value) -> new KeyValue<>(key.key().intValue(),value.longValue()))
.to("test_topic",Produced.with(Serdes.Integer(),Serdes.Long()));
Run Code Online (Sandbox Code Playgroud)
生成一些消息,等待 10 分钟(取决于您的窗口),再生成一些消息并等待直到执行提交(30 秒) - 将抛出您的异常。
出了什么问题?
问题是,suppress(...)旧消息的密钥是使用旧的 Serdes 序列化的。
suppress(...)操作由 执行KTableSuppressProcessor。它有内部缓冲区,用于在将消息转发(当消息过期时)到下一个 ProcessorNode 之前存储消息。
Suppress需要时间戳,因此其作为消息 key 的 buffer 由时间戳和字节数组组成(业务 key 与业务 Serdes序列化后),消息的值只是一个字节数组(业务 value 序列化后)。
总结:buffer内部不关心业务消息类型。内部缓冲区在SUPPRESS 变更日志中具体化。
如果消息被转发到下一个ProcessorNode KTableSuppressProcessor,:
问题是为什么在启动时没有抛出异常,而是在一段时间后抛出异常?
在上面的第一个代码片段中,Long 用作分组的键。当消息传递到 时suppress,
suppress会将密钥序列化为字节数组,并使用时间戳和该字节数组作为其内部缓冲区的密钥。当应用程序停止时,内部缓冲区将具体化到SUPPRESS 更改日志主题中。
如果我们将分组键类型更改为整数(第二个代码片段)并启动应用程序,基于 SUPPRESS 更改日志主题,内部缓冲区将被恢复。在恢复期间,仅从原始密钥中提取时间戳。字节数组,表示业务部分未被触及。
当新消息传递到时,suppress它们将像以前一样进行处理(密钥将被序列化为字节数组,并且时间戳将用作内部缓冲区密钥)。处理每条消息后,KTableSuppressProcessor检查任何缓冲消息的时间戳是否过期,如果发生,请尝试将其转发到下一个 ProcessorNode。
在我们的示例中,作为内部缓冲区中的键,我们有时间戳(长整型)和代表业务键的字节数组(例如,长整型为 8 个字节,整数为 4 个字节)。因此,在转发之前,KTableSuppressProcessor将尝试使用 来反序列化这些数组(它们具有不同的长度)IntegerDeserializer。表示 Long 的字节数组将太长,并且 IntegerDeserializer 将引发异常。该操作不是在应用程序启动时发生,而是在执行提交时发生。
另一个问题可能是:如果我们运行两个版本的程序而没有:,为什么不会抛出异常suppress。
KStreamWindowAggregate(负责聚合)仅在其值被修改时传递聚合消息。因为我们更改了 Serdes,所以我们不会修改旧的聚合(密钥将序列化为不同的字节数组),而是添加新的聚合。另一方面,KTableSuppressProcessor传递所有过期的消息,甚至是那些用旧 Serdes 缓冲的消息。
| 归档时间: |
|
| 查看次数: |
5412 次 |
| 最近记录: |