我有2个Kafka主题从不同来源流式传输完全相同的内容,因此我可以获得高可用性,以防其中一个来源失败.我正在尝试使用Kafka Streams 0.10.1.0将2个主题合并为1个输出主题,这样我就不会错过任何有关失败的消息,并且在所有源都启动时没有重复.
使用leftJoinKStream 的方法时,其中一个主题可以没有问题(次要主题),但是当主要主题发生故障时,不会向输出主题发送任何内容.这似乎是因为,根据Kafka Streams开发人员指南,
KStream-KStream leftJoin始终由来自主流的记录驱动
因此,如果没有来自主流的记录,它将不会使用辅助流中的记录,即使它们存在.主流重新联机后,输出将恢复正常.
我也尝试使用outerJoin(添加重复记录),然后转换为KTable和groupByKey以消除重复,
KStream mergedStream = stream1.outerJoin(stream2,
(streamVal1, streamVal2) -> (streamVal1 == null) ? streamVal2 : streamVal1,
JoinWindows.of(2000L))
mergedStream.groupByKey()
.reduce((value1, value2) -> value1, TimeWindows.of(2000L), stateStore))
.toStream((key,value) -> value)
.to(outputStream)
Run Code Online (Sandbox Code Playgroud)
但我偶尔也会重复一遍.我也commit.interval.ms=200用来让KTable经常发送到输出流.
接近此合并以从多个相同的输入主题获得一次输出的最佳方法是什么?
我有一个Kafka Streams应用程序消耗并生成具有3个代理的Kafka集群,复制因子为3.除了消费者偏移主题(50个分区)之外,所有其他主题每个只有一个分区.
当代理尝试首选副本时,Streams应用程序(运行在与代理完全不同的实例上)失败并显示错误:
Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_0] exception caught when producing
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:119)
...
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.
Run Code Online (Sandbox Code Playgroud)
Streams应用程序尝试成为分区的领导者是正常的,因为它在不属于Kafka集群的服务器上运行?
我可以通过以下方式重现此行为:
bin/kafka-preferred-replica-election.sh --zookeeper localhost我的问题似乎与报告的失败类似,所以我想知道这是否是一个新的Kafka Streams错误.我的完整堆栈跟踪与报告的失败中链接的要点完全相同(此处).
另一个可能有趣的细节是,在领导者选举期间,我controller.log在经纪人中得到这些消息:
[2017-04-12 11:07:50,940] WARN [Controller-3-to-broker-3-send-thread], Controller 3's connection to broker BROKER-3-HOSTNAME:9092 (id: 3 rack: null) was unsuccessful (kafka.controller.RequestSendThread)
java.io.IOException: Connection to BROKER-3-HOSTNAME:9092 (id: 3 rack: null) failed
at kafka.utils.NetworkClientBlockingOps$.awaitReady$1(NetworkClientBlockingOps.scala:84)
at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:94)
at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:232)
at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:185) …Run Code Online (Sandbox Code Playgroud) 我正在构建一个 Java 8 应用程序,该应用程序查询 Kafka 主题以获取一条消息。每个请求都会创建一个新Consumer对象(独立于任何现有Consumer对象),它轮询我的 Kafka 主题,获取一条记录,然后Consumer关闭。这种情况每天发生约 20 万次,并且每个请求都独立于所有其他请求,因此我认为我无法重用消费者。基本上,用户从主题请求消息并为他们创建消费者,然后关闭。这种情况平均每秒发生约 2 次,但它是任意的,因此可能发生 10 次/秒或 1 次/小时,无法知道。
一段时间后,Kafka 服务器(不是运行代码的服务器,而是运行 Kafka 的实际服务器)上的堆大小变得很大,垃圾收集无法清除它。最终,更多的 CPU 时间专门用于 GC,并且一切都崩溃了,直到我重新启动 Kafka。
这是导致问题的代码的近似版本,具有while(true)近似的真实行为(在生产中,消费者不是在 while 循环中创建的,而是在用户从主题请求消息时按需创建的):
Properties props = new Properties();
props.put("bootstrap.servers", "SERVER_IP:9092");
props.put("session.timeout.ms", 30000);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", 1000);
while(true){
Consumer<String, String> consumer = new KafkaConsumer<>(props);
TopicPartition tp = new TopicPartition("TOPIC", 0);
consumer.assign(Arrays.asList(tp));
consumer.seekToEnd(Arrays.asList(tp));
// I've narrowed down the memory leak to this line
ConsumerRecords<String, String> cr = consumer.poll(1000);
// If …Run Code Online (Sandbox Code Playgroud)