Kafka多个分区排序

use*_*022 6 apache-kafka kafka-consumer-api kafka-producer-api apache-kafka-streams

我知道无法在Kafka中订购多个分区,并且只能为组内的单个使用者(对于单个分区)保证分区排序.然而,使用Kafka Streams 0.10现在可以实现这一目标吗?如果我们使用时间戳功能,以便每个分区中的每条消息都维护订单,那么在消费者方面,让我们说Kafka Streams 0.10现在可以吗?假设我们收到所有消息,我们可能不会根据消耗的时间戳对所有分区进行排序,并可能将它们转发到单独的主题以供消费?

目前我需要维护订购,但这意味着拥有一个带有单个消费者线程的分区.我想将其更改为多个分区以增加并行性,但不知何故"让它们按顺序排列".

有什么想法吗?谢谢.

Mic*_*oll 12

在这种情况下,您遇到两个问题:

  1. 具有多个分区的Kafka主题,以及Kafka不保证此类多分区主题的全局排序(主题).
  2. 主题及其分区的延迟到达/无序消息的可能性,与时间和时间戳有关.

我知道无法在Kafka中订购多个分区,并且只能为组内的单个使用者(对于单个分区)保证分区排序.然而,使用Kafka Streams 0.10现在可以实现这一目标吗?

简短的回答是:不,当您从具有多个分区的Kafka主题中读取时,仍然无法实现全局顺序.

此外,"分区排序"表示"基于分区中消息的偏移的分区排序".排序保证与消息的时间戳无关.

最后,只有在max.in.flight.requests.per.connection == 1以下情况下才能保证订购:

Apache Kafka文档中的生产者配置设置 :( max.in.flight.requests.per.connection默认值:): 5客户端在阻止之前将在单个连接上发送的最大未确认请求数.请注意,如果此设置设置为大于1并且发送失败,则存在由于重试而导致消息重新排序的风险(即,如果启用了重试).

请注意,此时我们正在讨论消费者行为(这是您最初的问题开始时)和Kafka中的生产者行为的组合.

如果我们使用时间戳功能,以便每个分区中的每条消息都维护订单,那么在消费者方面,让我们说Kafka Streams 0.10现在可以吗?

即使使用时间戳功能,我们仍然无法实现"每个分区中的每条消息都保持顺序".为什么?因为迟到/无序消息的可能性.

分区按偏移量排序,但不保证按时间戳排序.在实践中,分区的以下内容是完全可能的(时间戳通常是毫秒 - 从此纪元开始):

Partition offsets     0    1    2    3    4    5    6    7    8
Timestamps            15   16   16   17   15   18   18   19   17
                                          ^^
                                         oops, late-arriving data!
Run Code Online (Sandbox Code Playgroud)

什么是迟到/无序消息?想象一下,你有遍布世界各地的传感器,所有传感器都测量它们的局部温度,并将最新测量结果发送到Kafka主题.某些传感器可能具有不可靠的互联网连接,因此它们的测量可能会延迟几分钟,几小时甚至几天.最终他们的延迟测量将进入卡夫卡,但他们将"迟到".对于城市中的移动电话也是如此:有些可能耗尽电池/能源,需要在发送数据之前进行充电,有些可能因为您在地下等地方而失去互联网连接.

假设我们收到所有消息,我们可能不会根据消耗的时间戳对所有分区进行排序,并可能将它们转发到单独的主题以供消费?

理论上是的,但在实践中这是相当困难的."我们接收所有消息"的假设对于流式传输系统来说实际上是具有挑战性的(即使对于批处理系统,尽管可能在这里通常简单地忽略了迟到数据的问题).你永远不知道你是否真的收到了"所有消息" - 因为可能会有迟到的数据.如果您收到迟到的消息,您希望发生什么?再次重新处理/重新排序"全部"消息(现在包括迟到的消息),或者忽略迟到的消息(从而计算出错误的结果)?从某种意义上说,任何通过"让它们排序所有"而实现的全球排序要么是非常昂贵,要么是最大努力.


Gan*_*alf 1

我没有使用 Kafka 流 - 但可以使用普通消费者来做到这一点。

首先对分区进行排序 - 这假设您已经在每个您想要的分区中查找偏移量或使用 Consumer Group 来执行此操作。

private List<List<ConsumerRecord<String, String>>> orderPartitions(ConsumerRecords<String, String> events) {

    Set<TopicPartition> pollPartitions = events.partitions();
    List<List<ConsumerRecord<String, String>>> orderEvents = new ArrayList<>();
    for (TopicPartition tp : pollPartitions) {
        orderEvents.add(events.records(tp));
    }
    // order the list by the first event, each list is ordered internally also
    orderEvents.sort(new PartitionEventListComparator());
    return orderEvents;
}

/**
 * Used to sort the topic partition event lists so we get them in order
 */
private class PartitionEventListComparator implements Comparator<List<ConsumerRecord<String, String>>> {

    @Override
    public int compare(List<ConsumerRecord<String, String>> list1, List<ConsumerRecord<String, String>> list2) {
        long c1 = list1.get(0).timestamp();
        long c2 = list2.get(0).timestamp();
        if (c1 < c2) {
            return -1;
        } else if (c1 > c2) {
            return 1;
        }

        return 0;
    }


}
Run Code Online (Sandbox Code Playgroud)

然后只需循环分区以使事件按顺序排列 - 在实践中我发现这是可行的。

                ConsumerRecords<String, String> events = consumer.poll(500);
                int totalEvents = events.count();
                log.debug("Polling topic - recieved " + totalEvents + " events");
                if (totalEvents == 0) {
                    break;  // no more events
                }

                List<List<ConsumerRecord<String, String>>> orderEvents = orderPartitions(events);

                int cnt = 0;
                // Each list is removed when it is no longer needed
                while (!orderEvents.isEmpty() && sent < max) {
                    for (int j = 0; j < orderEvents.size(); j++) {
                        List<ConsumerRecord<String, String>> subList = orderEvents.get(j);
                        // The list contains no more events, or none in our time range, remove it
                        if (subList.size() < cnt + 1) {
                            orderEvents.remove(j);
                            log.debug("exhausted partition - removed");
                            j--;
                            continue;
                        }
                        ConsumerRecord<String, String> event = subList.get(cnt);
                        cnt++
}
Run Code Online (Sandbox Code Playgroud)