use*_*022 6 apache-kafka kafka-consumer-api kafka-producer-api apache-kafka-streams
我知道无法在Kafka中订购多个分区,并且只能为组内的单个使用者(对于单个分区)保证分区排序.然而,使用Kafka Streams 0.10现在可以实现这一目标吗?如果我们使用时间戳功能,以便每个分区中的每条消息都维护订单,那么在消费者方面,让我们说Kafka Streams 0.10现在可以吗?假设我们收到所有消息,我们可能不会根据消耗的时间戳对所有分区进行排序,并可能将它们转发到单独的主题以供消费?
目前我需要维护订购,但这意味着拥有一个带有单个消费者线程的分区.我想将其更改为多个分区以增加并行性,但不知何故"让它们按顺序排列".
有什么想法吗?谢谢.
Mic*_*oll 12
在这种情况下,您遇到两个问题:
我知道无法在Kafka中订购多个分区,并且只能为组内的单个使用者(对于单个分区)保证分区排序.然而,使用Kafka Streams 0.10现在可以实现这一目标吗?
简短的回答是:不,当您从具有多个分区的Kafka主题中读取时,仍然无法实现全局顺序.
此外,"分区排序"表示"基于分区中消息的偏移的分区排序".排序保证与消息的时间戳无关.
最后,只有在max.in.flight.requests.per.connection == 1以下情况下才能保证订购:
Apache Kafka文档中的生产者配置设置 :(
max.in.flight.requests.per.connection默认值:):5客户端在阻止之前将在单个连接上发送的最大未确认请求数.请注意,如果此设置设置为大于1并且发送失败,则存在由于重试而导致消息重新排序的风险(即,如果启用了重试).
请注意,此时我们正在讨论消费者行为(这是您最初的问题开始时)和Kafka中的生产者行为的组合.
如果我们使用时间戳功能,以便每个分区中的每条消息都维护订单,那么在消费者方面,让我们说Kafka Streams 0.10现在可以吗?
即使使用时间戳功能,我们仍然无法实现"每个分区中的每条消息都保持顺序".为什么?因为迟到/无序消息的可能性.
分区按偏移量排序,但不保证按时间戳排序.在实践中,分区的以下内容是完全可能的(时间戳通常是毫秒 - 从此纪元开始):
Partition offsets 0 1 2 3 4 5 6 7 8
Timestamps 15 16 16 17 15 18 18 19 17
^^
oops, late-arriving data!
Run Code Online (Sandbox Code Playgroud)
什么是迟到/无序消息?想象一下,你有遍布世界各地的传感器,所有传感器都测量它们的局部温度,并将最新测量结果发送到Kafka主题.某些传感器可能具有不可靠的互联网连接,因此它们的测量可能会延迟几分钟,几小时甚至几天.最终他们的延迟测量将进入卡夫卡,但他们将"迟到".对于城市中的移动电话也是如此:有些可能耗尽电池/能源,需要在发送数据之前进行充电,有些可能因为您在地下等地方而失去互联网连接.
假设我们收到所有消息,我们可能不会根据消耗的时间戳对所有分区进行排序,并可能将它们转发到单独的主题以供消费?
理论上是的,但在实践中这是相当困难的."我们接收所有消息"的假设对于流式传输系统来说实际上是具有挑战性的(即使对于批处理系统,尽管可能在这里通常简单地忽略了迟到数据的问题).你永远不知道你是否真的收到了"所有消息" - 因为可能会有迟到的数据.如果您收到迟到的消息,您希望发生什么?再次重新处理/重新排序"全部"消息(现在包括迟到的消息),或者忽略迟到的消息(从而计算出错误的结果)?从某种意义上说,任何通过"让它们排序所有"而实现的全球排序要么是非常昂贵,要么是最大努力.
我没有使用 Kafka 流 - 但可以使用普通消费者来做到这一点。
首先对分区进行排序 - 这假设您已经在每个您想要的分区中查找偏移量或使用 Consumer Group 来执行此操作。
private List<List<ConsumerRecord<String, String>>> orderPartitions(ConsumerRecords<String, String> events) {
Set<TopicPartition> pollPartitions = events.partitions();
List<List<ConsumerRecord<String, String>>> orderEvents = new ArrayList<>();
for (TopicPartition tp : pollPartitions) {
orderEvents.add(events.records(tp));
}
// order the list by the first event, each list is ordered internally also
orderEvents.sort(new PartitionEventListComparator());
return orderEvents;
}
/**
* Used to sort the topic partition event lists so we get them in order
*/
private class PartitionEventListComparator implements Comparator<List<ConsumerRecord<String, String>>> {
@Override
public int compare(List<ConsumerRecord<String, String>> list1, List<ConsumerRecord<String, String>> list2) {
long c1 = list1.get(0).timestamp();
long c2 = list2.get(0).timestamp();
if (c1 < c2) {
return -1;
} else if (c1 > c2) {
return 1;
}
return 0;
}
}
Run Code Online (Sandbox Code Playgroud)
然后只需循环分区以使事件按顺序排列 - 在实践中我发现这是可行的。
ConsumerRecords<String, String> events = consumer.poll(500);
int totalEvents = events.count();
log.debug("Polling topic - recieved " + totalEvents + " events");
if (totalEvents == 0) {
break; // no more events
}
List<List<ConsumerRecord<String, String>>> orderEvents = orderPartitions(events);
int cnt = 0;
// Each list is removed when it is no longer needed
while (!orderEvents.isEmpty() && sent < max) {
for (int j = 0; j < orderEvents.size(); j++) {
List<ConsumerRecord<String, String>> subList = orderEvents.get(j);
// The list contains no more events, or none in our time range, remove it
if (subList.size() < cnt + 1) {
orderEvents.remove(j);
log.debug("exhausted partition - removed");
j--;
continue;
}
ConsumerRecord<String, String> event = subList.get(cnt);
cnt++
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
5092 次 |
| 最近记录: |