在Apache Kafka中读取消息偏移量

Ana*_*and 3 java offset consumer apache-kafka

我非常新Kafka,我们正在使用Kafka 0.8.1.

我需要做的是从主题中消费一条消息.为此,我将不得不用Java编写一个使用者,它将使用来自主题的消息,然后将该消息保存到数据库.保存消息后,将向Java使用者发送一些确认.如果确认为真,则应从主题中消耗下一条消息.如果confirmldgement为false(这意味着由于某些错误消息,从主题中读取,无法保存到数据库中),则应再次读取该消息.

我想我需要使用 Simple Consumer,来控制消息偏移,并且已经通过了这个链接中给出的Simple Consumer示例https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example.

在此示例中,offset在run方法中计算为' readOffset'.我需要玩那个吗?例如,我可以使用LatestTime()而不是EarliestTime()在假的情况下,我会在使用前将偏移重置为一个offset - 1.

这是我应该怎么做?

san*_*ris 9

我认为你可以使用高级消费者(http://kafka.apache.org/documentation.html#highlevelconsumerapi),这应该比SimpleConsumer更容易使用.我不认为消费者需要重新读取Kafka关于数据库故障的消息,因为消费者已经拥有这些消息并且可以将它们重新发送到数据库或执行其认为合适的任何其他操作.

高级消费者存储从Zookeeper中的特定分区读取的最后一个偏移量(基于消费者组名称),以便当消费者进程终止并稍后重新启动时(可能在其他主机上),它可以继续处理消息离开了.可以定期将此偏移自动保存到Zookeeper(请参阅使用者属性auto.commit.enable和auto.commit.interval.ms),或通过调用将其保存为应用程序逻辑ConsumerConnector.commitOffsets.另请参阅https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example.

我建议你在收到DB确认后关闭自动提交并自行提交补偿.因此,您可以确保在消费者失败的情况下从Kafka重新读取未处理的消息,并且所有提交给Kafka的消息最终将至少一次到达DB(但不​​是"恰好一次").