如何在 1 周后恰好处理来自分布式日志代理(例如 Kafka)的日志?

raj*_*aju 5 python java apache-kafka apache-spark apache-storm

如果我想处理 Kafka 刚好 1 周的日志,可以进行哪些设置?

用例是我维护过去 1 周用户活动的累积统计数据。我对最终的一致性没问题,不需要恰好 1 周的统计数据。

我有一个流设置,它处理来自 Kafka 的传入日志并更新统计信息。任何超过 1 周的活动都应从统计数据中删除。我可以实现的方法之一是使用批处理(例如 Spark)从统计信息中删除超过 1 周的活动。

有什么方法可以使用流处理从统计信息中删除超过 1 周的用户活动?各种方法的优缺点是什么?

如果我在 Kafka 中至少使用一次并且统计数据偏离了基本事实,那么定期更正统计数据的方法是什么?

Jav*_*cal 1

如果您的 Kafka 消息具有正确的时间戳,那么您可以获得上周时间戳的偏移量。所以你可以使用..

Map<TopicPartition,OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition,Long> timestampsToSearch)
Run Code Online (Sandbox Code Playgroud)

文档

按时间戳查找给定分区的偏移量。每个分区返回的偏移量是相应分区中时间戳大于或等于给定时间戳的最早偏移量。

要获取主题分区列表,您可以调用consumer.assignment()(aftersubscribe()assign()) 返回Set<TopicPartition>分配给消费者。地图中的值Long基本上是时间戳。因此,对于您的情况下的所有键,它将具有相同的值(即 1 周前的时间戳)

现在,您已经有了一个Map<TopicPartition, OffsetAndTimestamp>. 您现在可以使用seek(TopicPartition partition, long offset)来查找每个偏移量。

consumer.subscribe(topics);
Set<TopicPartition> partitions = consumer.assignment();
Map<TopicPartition, Long> map = new LinkedHashMap<>();
partitions.forEach(partition -> map.put(partition, oneWeekOldTimestamp));
Map<TopicPartition, OffsetAndTimestamp> offsetsMap = consumer.offsetForTimes(map);
offsetsMap.forEach((partition, offsetTimestamp) -> consumer.seek(partition, offsetTimestamp.offset()));
Run Code Online (Sandbox Code Playgroud)

现在,您的消费者将看到一周前的消息。因此,当您poll()从上周到现在进行民意调查时。

您可以更改时间戳以满足您的要求,例如,任何早于 1 周的内容都表示从时间戳 0 到上周时间戳。

所有上周数据意味着,2weekOldTimestamp - 1weekOldTimestamp

因此,在这种情况下,您必须寻找2weekOldTimestamp并处理每个分区,直到遇到1weekOldTimestamp