Swa*_*pta 1 round-robin partitioner apache-kafka kafka-producer-api
I am trying to use Kafka's RoundRobinPartitioner class for distributing messages evenly across all the partitions. My Kafka topic configuration is as follows:
name: multischemakafkatopicodd
number of partitions: 16
replication factor: 2
Say, if I am producing 100 messages then each partition should have 6 or 7 messages. But, I am getting something similar to this:
sh /usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.0.55.211:9092 --topic multischemakafkatopicodd --time -1
multischemakafkatopicodd:0:26
multischemakafkatopicodd:5:0
multischemakafkatopicodd:10:24
multischemakafkatopicodd:15:0
multischemakafkatopicodd:13:0
multischemakafkatopicodd:8:26
multischemakafkatopicodd:2:26
multischemakafkatopicodd:12:24
multischemakafkatopicodd:14:24
multischemakafkatopicodd:9:0
multischemakafkatopicodd:11:0
multischemakafkatopicodd:4:26
multischemakafkatopicodd:1:0
multischemakafkatopicodd:6:24
multischemakafkatopicodd:7:0
multischemakafkatopicodd:3:0
Run Code Online (Sandbox Code Playgroud)
I thought that may be I am not producing enough messages, so I tried with 1M records and set the number of partitions to an odd number:
topic: multischemakafkatopicodd
number of partitions: 31
replication factor: 2
...and I got this. This time the number of messages in each partition is somewhat evenly distributed.
sh /usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.0.55.211:9092 --topic multischemakafkatopicodd --time -1
multischemakafkatopicodd:0:33845
multischemakafkatopicodd:5:34388
multischemakafkatopicodd:10:33837
multischemakafkatopicodd:20:33819
multischemakafkatopicodd:15:33890
multischemakafkatopicodd:25:34414
multischemakafkatopicodd:30:33862
multischemakafkatopicodd:26:34066
multischemakafkatopicodd:9:34088
multischemakafkatopicodd:11:34124
multischemakafkatopicodd:16:33802
multischemakafkatopicodd:4:34061
multischemakafkatopicodd:17:34977
multischemakafkatopicodd:3:34084
multischemakafkatopicodd:24:33849
multischemakafkatopicodd:23:34111
multischemakafkatopicodd:13:34062
multischemakafkatopicodd:28:33876
multischemakafkatopicodd:18:34098
multischemakafkatopicodd:22:34058
multischemakafkatopicodd:8:34079
multischemakafkatopicodd:2:33839
multischemakafkatopicodd:12:34075
multischemakafkatopicodd:29:34132
multischemakafkatopicodd:19:33924
multischemakafkatopicodd:14:34109
multischemakafkatopicodd:1:34088
multischemakafkatopicodd:6:33832
multischemakafkatopicodd:7:34080
multischemakafkatopicodd:27:34188
multischemakafkatopicodd:21:34684
Run Code Online (Sandbox Code Playgroud)
Again I did the same test but decreased the number of partitions to 8 and I got this result where we can clearly see that some partitions have close to 15K messages while others have around 10K:
multischemakafkatopicodd:0:155927
multischemakafkatopicodd:5:105351
multischemakafkatopicodd:1:107382
multischemakafkatopicodd:4:160533
multischemakafkatopicodd:6:158007
multischemakafkatopicodd:7:105608
multischemakafkatopicodd:2:157934
multischemakafkatopicodd:3:105599
Run Code Online (Sandbox Code Playgroud)
Am I doing anything wrong or is this how it is supposed to work? Why is there such an unequal distribution of messages?
If anyone can help me out, that would be great. Thanks.
据我了解,分区器运行良好。但您必须了解生产者为了最大限度地提高性能而进行的优化:
生产者不会为每个发送调用将每条消息生成到不同的分区,因为这有点过分了。
Round-Robin
保证类似的分布,但可以批量发送。这意味着,它将根据的代码中进行的(不是模数!)操作来缓冲发往分区的一定数量的消息:remainder
RoundRobinPartitioner
int part = Utils.toPositive(nextValue) % availablePartitions.size();
Run Code Online (Sandbox Code Playgroud)
nextValue
是AtomicInteger
每次分区/发送调用时加 1 的值。因此,假设在此过程中没有任何分区被声明为不可用,则余数也将始终加一(以循环方式,例如有 4 个分区:)0-1-2-3-0-1-2-3-...
。如果发生这种情况,循环可能如下所示0-1-2-(partition4fails)-0-1-2-(partition4OK)-3-0-...
(消息数计数器从 0 - 开始new AtomicInteger(0)
)
MsgN % Partitions Partition
0%4 0
1%4 1
2%4 2
3%4 3
4%4 0
5%4 1
6%4 2
7%4 3
8%4 0
... ...
Run Code Online (Sandbox Code Playgroud)
当生成第 9 条消息时,第一个分区的缓冲区已满(因为它已经容纳了 3 条消息),因此准备发送到 kafka。如果您立即停止该进程,4 个分区将如下所示:
Partition Offset
0 3
1 0
2 0
3 0
Run Code Online (Sandbox Code Playgroud)
当生成第 10 条消息时,第二个分区的缓冲区也将准备好从线路中发送,主题如下所示:
Partition Offset
0 3
1 3
2 0
3 0
Run Code Online (Sandbox Code Playgroud)
在现实生活中,缓冲区通常保存大量消息(这也可以调整)。举例来说,存储了 1000 条消息。对于相同的场景,分区将如下所示:
Partition Offset
0 1000
1 1000
2 0
3 0
Run Code Online (Sandbox Code Playgroud)
因此增加了分区之间的“视觉”差异。批量大小/缓冲区大小越大,它就越臭名昭著。
这与生产者partitioner
线程本身的性质有关:默认情况下,它不会独立发送每条消息,而是存储它们,以便在每次代理调用时发送多条消息,从而优化系统性能。
批处理是效率的重要驱动因素之一,为了启用批处理,Kafka 生产者将尝试在内存中累积数据并在单个请求中发送更大的批次
如果生产者停止/启动,这种不平衡可能会更加臭名昭著,因为无论之前选择的分区如何,它都会重新启动该机制(因此它可以开始发送到在停止之前选择的同一分区,从而增加与其他分区的差异)上次执行的非选举分区)。
在新的执行中,缓冲区将全部为空,因此无论哪个分区接收最多,进程都会重新启动。
因此,您可以在此处停止该过程:
Partition Offset
0 1000
1 1000
2 0
3 0
Run Code Online (Sandbox Code Playgroud)
保存每个主题的消息数量计数器的映射将重新启动,因为它不是代理的一部分,而是生产者的Partitioner 类的一部分。如果生产者没有正确关闭和/或刷新,那些缓存的消息也将丢失。因此,在这种情况下,您得到的是先前逻辑的重复:
MsgN % Partitions Partition
0%4 0
1%4 1
2%4 2
3%4 3
(...)
Run Code Online (Sandbox Code Playgroud)
这会在某个时刻导致这样的情况:
Partition Offset
0 2000
1 2000
2 0
3 0
Run Code Online (Sandbox Code Playgroud)
它是由发送过程的非连续执行产生的不平衡,但它超出了 的范围RoundRobinPartitioner
,其本质是基于连续过程(不间断)的。
您可以通过在发送消息时检查每个分区的偏移量来验证此行为:只有当选定的分区存储了n条消息时,下一个选定的分区才会获取其批次的n条消息。
注意:示例中显示的数字引用了“完美”场景;在现实生活中,消息也可以被撤销、压缩、失败、刷新,无论缓冲区大小如何,分区不可用,...导致偏移量,如您的问题中所示。
最后一个带有刷新场景的示例:
Partition Offset
0 1000
1 1000
2 0
3 0
Run Code Online (Sandbox Code Playgroud)
进程已停止,但生产者已正确关闭并刷新其消息,因此主题如下所示:
Partition Offset
0 1997
1 1996
2 999
3 998
Run Code Online (Sandbox Code Playgroud)
该过程重新启动。刷新第一个分区的缓冲区后,将如下所示:
Partition Offset
0 2997
1 1996
2 999
3 998
Run Code Online (Sandbox Code Playgroud)
因此增加了关于该机制的“公平性”的混乱。但这不是它的错,因为分区程序的映射、计数器和缓冲区中没有持久性。如果您让该过程不间断地执行几天,您会发现它确实以“近乎平等”的方式平衡了消息。
RoundRobinPartitioner
的相关方法:@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster)
{
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions=cluster.availablePartitionsForTopic(topic);
if (!availablePartitions.isEmpty()) {
/*remainder calculus in order to select next partition*/
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
}
private int nextValue(String topic)
{
/*Counter of num messages sent. topicCounterMap is part of the producer
process, hence not persisted by default.
It will start by 0 for every topic with each new launch*/
AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
return new AtomicInteger(0); });
return counter.getAndIncrement();
}
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
3260 次 |
最近记录: |