Kafka使用者具有一个配置max.poll.records,该配置可控制在对poll()的单次调用中返回的最大记录数,其默认值为500。我将其设置为非常高的数字,以便可以在一次轮询中获取所有消息。但是,民意测验在一个呼叫中仅返回数千条消息(大约6000条),即使该主题还有更多。
如何进一步增加单个使用者读取的消息数量?
Dea*_*ool 10
消费者poll()将依赖于文档中的这些属性,如果我们看到max.partition.fetch.bytes 消费者仍然轮询大于限制的批处理并且可以限制使用,fetch.max.bytes而消费者仍然可以轮询大于限制的批处理,但是主题配置和代理配置中还有另一个属性对此进行了限制,
因此,一种方法是根据所需的批次大小来增加所有这些属性
在Broker config message.max.bytes中,默认值为1000012
Kafka允许的最大记录批量。如果增加此数量,并且有一些消费者的年龄大于0.10.2,则消费者的获取大小也必须增加,以便他们可以获取如此大的记录批次。
在最新的消息格式版本中,为了提高效率,始终将记录分组。在以前的消息格式版本中,未压缩的记录不会分组,并且在这种情况下,此限制仅适用于单个记录。
可以使用主题级别max.message.bytes配置对每个主题进行设置。
在Topic config max.message.bytes中,默认值为1000012
Kafka允许的最大记录批量。如果增加此数量,并且有一些消费者的年龄大于0.10.2,则消费者的获取大小也必须增加,以便他们可以获取如此大的记录批次。
在最新的消息格式版本中,为了提高效率,始终将记录分组。在以前的消息格式版本中,未压缩的记录不会分组,并且在这种情况下,此限制仅适用于单个记录。
在Consumer config max.partition.fetch.bytes中,默认值为1048576
服务器将返回的每个分区的最大数据量。记录由消费者分批提取。如果提取的第一个非空分区中的第一个记录批处理大于此限制,则仍将返回该批处理以确保使用者可以取得进展。代理接受的最大记录批处理大小是通过message.max.bytes(代理配置)或max.message.bytes(主题配置)定义的。请参阅fetch.max.bytes以限制使用者请求大小
在Consumer Config fetch.max.bytes中,默认值为52428800
服务器为获取请求应返回的最大数据量。使用者将批量提取记录,并且如果所提取的第一个非空分区中的第一个记录批次大于此值,则仍将返回记录批次以确保使用者可以取得进展。因此,这不是绝对最大值。代理接受的最大记录批处理大小是通过message.max.bytes(代理配置)或max.message.bytes(主题配置)定义的。请注意,使用者并行执行多个提取。
您的有效负载很可能受限制max.partition.fetch.bytes,默认情况下为1MB。请参阅Kafka Consumer配置。
这是很好的详细说明:
最大拼版字节数
此属性控制服务器将为每个分区返回的最大字节数。默认值为1 MB,这意味着当KafkaConsumer.poll()返回ConsumerRecords时,记录对象将为分配给使用者的每个分区最多使用max.partition.fetch.bytes。因此,如果一个主题有20个分区,而您有5个使用者,则每个使用者将需要有4 MB的内存可用于ConsumerRecords。实际上,您将要分配更多的内存,因为如果组中的其他使用者失败,则每个使用者将需要处理更多的分区。最高 partition.fetch.bytes必须大于代理将接受的最大消息(由代理配置中的max.message.size属性确定),否则代理可能会有一些使用者无法使用的消息,在这种情况下消费者会停下来尝试阅读它们。设置max.partition.fetch.bytes时,另一个重要的考虑因素是使用者处理数据所花费的时间。您还记得,使用者必须足够频繁地调用poll()以避免会话超时和随后的重新平衡。如果单个poll()返回的数据量非常大,则消费者可能需要花费更长的时间进行处理,这意味着它将无法及时到达轮询循环的下一个迭代,从而避免会话超时。如果发生这种情况,则两个选项要么降低最大值。partition.fetch.bytes或增加会话超时。如果单个poll()返回的数据量非常大,则消费者可能需要花费更长的时间进行处理,这意味着它将无法及时到达轮询循环的下一个迭代,从而避免会话超时。如果发生这种情况,则两个选项要么降低最大值。partition.fetch.bytes或增加会话超时。如果单个poll()返回的数据量非常大,则消费者可能需要花费更长的时间进行处理,这意味着它将无法及时到达轮询循环的下一个迭代,从而避免会话超时。如果发生这种情况,则两个选项要么降低最大值。partition.fetch.bytes或增加会话超时。
希望能帮助到你!
| 归档时间: |
|
| 查看次数: |
11659 次 |
| 最近记录: |