Arn*_*lle 6 apache-kafka apache-kafka-streams
Java:OpenJdk 11 Kafka:2.2.0 Kafka 流库:2.3.0
我正在尝试在docker容器中部署我的 Kafka 流应用程序,但在尝试使用 TopicAuthorizationException 创建内部状态存储时失败。它在本地运行良好。本地和服务器之间的主要区别在于,它连接到部署了 Kafka 的服务器并使用通常的Kerberos身份验证进行身份验证。我无法理解身份验证和本地商店之间的联系。
我的流看起来像这样:
StreamsBuilder builder = new StreamsBuilder();
//We stream from the source topic
KStream<String, EnrichedMessagePayload> sourceMessagesStream = builder.stream(sourceTopic, Consumed
.with(Serdes.serdeFrom(String.class), INPUT_SERDE));
//We group per room and window
TimeWindowedKStream<String, EnrichedMessagePayload> windowed = sourceMessagesStream
.groupByKey().windowedBy(TimeWindows.of(Duration.ofMillis(windowSize)).grace(Duration.ZERO));
//We make them a list
KStream<Windowed<String>, WindowedMessages> grouped = windowed
.aggregate(WindowedMessages::new,
(key, value, aggregate) -> aggregate.add(value),
Materialized.with(Serdes.String(), Serdes.serdeFrom(windowSerializer, windowSerializer)))
.suppress(Suppressed.untilWindowCloses(unbounded()))
.toStream();
//Filter
KStream<Windowed<String>, FilterResult> filtered = grouped
.mapValues((readOnlyKey, value) -> filterWindow(value.getMessages()));
//Re map to its original form
KStream<String, OutputPayload> reduced = filtered
.flatMap((KeyValueMapper<Windowed<String>, WindowedMessages, Iterable<KeyValue<String, OutputPayload>>>) (key, value) -> value
.getMessages()
.stream().map(payload -> new KeyValue<>(key.key(), payload))
.collect(toList()));
//Target topic
reduced.to(sinkTopic, Produced
.with(Serdes.serdeFrom(String.class), SERDE));
return builder.build();
Run Code Online (Sandbox Code Playgroud)
它接收一个消息流,将其窗口化,聚合每个窗口的所有消息,仅保留列表的最后一个版本,带有“已抑制”,然后将整个列表进行平面映射以将其转发到另一个主题。
每次我遇到这种异常时:
错误消息是:org.apache.kafka.common.errors.TopicAuthorizationException:未授权访问主题:[主题授权失败。] 2019-10-09 06:44:03.255 +0000 ERROR [filterer-d83f2f60-b2bd-40b2- a314-4b20f32918f7-StreamThread-1] [StreamThread.java:777] - stream-thread [filterer-d83f2f60-b2bd-40b2-a314-4b20f32918f7-StreamThread-1] 在处理过程中遇到以下意外的Kafka异常,这通常表示内部流错误:-[rapid_r-live-message-filterer-0-0-1-snapshot-10.1e842f1a-ea60-11e9-9c7d-024298932744]-[]-[]org.apache.kafka.streams.errors.StreamsException:可以不创建主题过滤器-KTABLE-SUPPRESS-STATE-STORE-0000000005-changelog。在 org.apache.kafka.streams.processor.internals.InternalTopicManager.getNumPartitions(InternalTopicManager.java:212) 在 org.apache.kafka.streams。
这不是“认证”,而是“授权”。查看您的日志消息,它显示“未授权访问主题”。据我所知,您无权创建支持本地抑制状态存储的内部主题“filterer-KTABLE-SUPPRESS-STATE-STORE-0000000005-changelog”。默认情况下,Kafka Streams 中包含的状态存储由 Kafka 代理上的主题支持。此内部主题在故障转移期间用于恢复本地状态存储。这些内部主题由 Kafka Streams 应用程序自动创建,因此应用程序需要具有适当的权限才能创建它们。
有关更多信息,请参阅https://kafka.apache.org/23/documentation/streams/developer-guide/security.html#id1 。其中表示“运行应用程序的主体必须设置 ACL,以便应用程序有权创建、读取和写入内部主题。”
| 归档时间: |
|
| 查看次数: |
19685 次 |
| 最近记录: |