kafka 流状态存储 max.request.size 参数问题

Jay*_*Jay 2 apache-kafka apache-kafka-streams

我们在项目中使用Kafka流状态存储,并且我们想要存储超过1MB的数据,但是我们遇到了以下异常:

该消息序列化后为 1760923 字节,大于您使用 max.request.size 配置配置的最大请求大小。

然后我点击链接添加前缀到 StreamsConfig 以启用设置默认内部主题配置并添加以下配置:

topic.max.request.size=50000000
Run Code Online (Sandbox Code Playgroud)

然后应用程序工作正常,并且当状态存储内部主题已创建时它可以正常工作,但是当 Kafka 重新启动并且状态存储主题已丢失/删除时,Kafka 流处理器需要在启动时自动创建内部状态存储主题应用程序,此时,它抛出异常,其中显示:

"Aorg.apache.kafka.streams.errors.StreamsException: Could not create topic data-msg-seq-state-store-changelog. at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:148)....
.....
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774) Caused by: org.apache.kafka.common.errors.InvalidConfigurationException: Unknown topic config name: max.request.size".
Run Code Online (Sandbox Code Playgroud)

解决方案是我们可以手动创建内部主题,但这应该不是一个好的解决方案。

你能帮我解决这个问题吗?如果我错过了任何配置?

非常感谢。

2020年6月17日更新:仍然没有解决问题。任何人都可以帮忙吗?

小智 6

您正在寻找的解决方案在于您在启动流之前设置的 Kafka Stream 的配置属性。

props.put(StreamsConfig.PRODUCER_PREFIX + ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "5242880");
Run Code Online (Sandbox Code Playgroud)

我在这里使用的值是 5 MB(以字节为单位)。您可以更改该值以满足您的需要。