在卡夫卡,如何根据生产时间获得精确的偏移量

Po *_*hou 12 timestamp offset

我需要在一天内每小时收到卡夫卡的消息.每隔一小时我就会开始一份工作来消费1小时前制作的消息.例如,如果当前时间是20:12,我将在19:00:00和19:59:59之间消费该消息.这意味着我需要在时间19:00:00获得开始偏移,并在时间19:59:59之前结束偏移.我使用了SimpleConsumer.getOffsetsBefore,如" 0.8.0 SimpleConsumer Example "中所示.问题是返回的偏移量与作为参数给出的时间戳不匹配.例如,当时间戳为19:00:00时,我收到时间16:38:00产生的消息.

Lij*_*ohn 13

getOffsetsByTimes()可以使用以下kafka consumer api方法,它可以从0.10.0或更高版本获得.请参阅JavaDoc.

/**
 * Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the
 * earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition.
 *
 * This is a blocking call. The consumer does not have to be assigned the partitions.
 * If the message format version in a partition is before 0.10.0, i.e. the messages do not have timestamps, null
 * will be returned for that partition.
 *
 * Notice that this method may block indefinitely if the partition does not exist.
 *
 * @param timestampsToSearch the mapping from partition to the timestamp to look up.
 * @return a mapping from partition to the timestamp and offset of the first message with timestamp greater
 *         than or equal to the target timestamp. {@code null} will be returned for the partition if there is no
 *         such message.
 * @throws IllegalArgumentException if the target timestamp is negative.
 */
@Override
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
    for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) {
        // we explicitly exclude the earliest and latest offset here so the timestamp in the returned
        // OffsetAndTimestamp is always positive.
        if (entry.getValue() < 0)
            throw new IllegalArgumentException("The target time for partition " + entry.getKey() + " is " +
                    entry.getValue() + ". The target time cannot be negative.");
    }
    return fetcher.getOffsetsByTimes(timestampsToSearch, requestTimeoutMs);
}
Run Code Online (Sandbox Code Playgroud)


cmc*_*abe 6

正如其他回复所指出的那样,旧版本的Kafka只有一种将时间映射到偏移的近似方法.但是,自从Kafka 0.10.0(2016年5月发布)以来,Kafka为每个主题保留了时间索引.这将使您有效地从时间到精确的偏移.您可以使用KafkaConsumer #offsetsForTimes方法来访问此信息.

有关如何在KIP-33设计讨论页面上实现基于时间的索引的更多详细信息.


JnB*_*ymn 5

在Kafka中,目前无法获得与特定时间戳相对应的偏移量 - 这是设计使然.如Jay Kreps的Log Article顶部所述,偏移数为日志提供了一种与挂钟时间分离的时间戳.将偏移量作为您的时间概念,您可以知道任何两个系统是否处于一致状态,只需购买知道他们读取的偏移量.对于不同服务器上的不同时钟时间,闰年,日光节省时间,时区等,从来没有任何混淆.它有点不错......

现在......所有这一切,如果你知道你的服务器在某个时间X下降,那么实际上,你真的想知道相应的偏移量.你可以近距离接触.kafka机器上的日志文件是根据它们开始编写的时间命名的,并且存在一个kafka工具(我现在找不到),让您知道哪些偏移与这些文件相关联.如果您想知道确切的时间戳,那么您必须对您发送给Kafka的消息中的时间戳进行编码.

  • 请注意,Kafka很快将支持此功能:https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index (4认同)
  • 是的,我使用“在您发送给 Kafka 的消息中编码时间戳”解决了这个问题,并且它已经工作了几个月。 (2认同)