kar*_*ani 5 apache-kafka kafka-consumer-api
在我提出问题之前,我想指出这里已经提出了一个类似的问题,但尚未得到解答,因此我再次提问。请不要将此标记为重复,因为前面提到的问题没有任何答案。
我对fetch.max.wait.ms
和有疑问consumer.poll(<value>)
。这是我在对上述配置的研究中发现的
poll() 方法采用超时参数。这指定了 poll 需要多长时间才能返回,有或没有数据
如果您将 fetch.max.wait.ms 设置为 100 ms 并将 fetch.min.bytes 设置为 1 MB,Kafka 将收到来自消费者的 fetch 请求,并在有 1 MB 数据要返回时或在 100 之后响应数据ms,以先发生者为准。
所以我的问题是,会发生什么情况fetch.max.wait.ms=500
,consumer.poll(200)
以及fetch.min.bytes= 500
但是券商没有足够的数据以恢复为集fetch.min.bytes
?
获取最小字节数
此属性允许使用者指定在获取记录时希望从代理接收的最小数据量。如果代理收到来自消费者的记录请求,但新记录的字节数少于 fetch.min.bytes,则代理将等到有更多消息可用,然后再将记录发送回消费者。
fetch.max.wait.ms
它会通知代理等待,直到有足够的数据要发送,然后再响应消费者。
示例:如果将 fetch.max.wait.ms 设置为 100 ms,将 fetch.min.bytes 设置为 1 MB,Kafka 将收到来自消费者的提取请求,并在有 1 MB 数据要返回时响应数据,或者100 毫秒后,以先发生者为准。
上面两个参数控制broker同时向consumer响应消息。
轮询(超时)
基本上,如果代理中没有数据可供使用, poll() 控制 poll() 将阻塞多长时间。
消费者端请求轮询以获取 Broker 响应的记录。它调用 fetchrecords() ,如果代理中已有可用记录且满足上述参数 fetch.min.bytes 和 fetch.max.wait.ms ,它将立即响应,否则等到给定的超时返回空,以防代理中没有可用记录。
下面解释了 KafkaConsumer 类中的 pollForfetches 方法
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(final long timeoutMs) {
final long startMs = time.milliseconds();
long pollTimeout = Math.min(coordinator.timeToNextPoll(startMs), timeoutMs);
// if data is available already, return it immediately
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
if (!records.isEmpty()) {
return records;
}
// send any new fetches (won't resend pending fetches)
fetcher.sendFetches();
// We do not want to be stuck blocking in the poll if we are missing some positions
// since the offset lookup may be backing off after a failure
// NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call
// updateAssignmentMetadataIfNeeded before this method.
if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) {
pollTimeout = retryBackoffMs;
}
client.poll(pollTimeout, startMs, () -> {
// since a fetch might be completed by the background thread, we need this poll condition
// to ensure that we do not block unnecessarily in poll()
return !fetcher.hasCompletedFetches();
});
// after the long poll, we should check whether the group needs to rebalance
// prior to returning data so that the group can stabilize faster
if (coordinator.rejoinNeededOrPending()) {
return Collections.emptyMap();
}
return fetcher.fetchedRecords();
}
Run Code Online (Sandbox Code Playgroud)
如果 fetch.min.bytes= 500 且 fetch.max.wait.ms=500,这意味着代理将在有 500 字节数据要返回时或 500 毫秒后响应消费者,以先发生者为准。消费者端轮询将每 200 毫秒调用一次 fetchedRecords 以接收代理提供的任何消息。
在我看来,您建议的配置fetch.max.wait.ms > pollDuration
不是一个健康的配置,因为它会导致奇怪的行为。我将尝试用一个例子来解释。
首先,我们假设一个健康的配置,其中fetch.max.wait.ms
小于pollDuration
。
如果您进行轮询并且根本没有数据通过该主题,您将等到pollDuration
通过然后poll()
返回。如果您的主题中有大量数据,您将获取它并poll()
立即返回。
有趣的用例是,如果您进行轮询并且主题中有少量数据,小于fetch.min.bytes
,或者主题中根本没有数据,但数据在您轮询时到达。
该用例如下:
我认为这是理所当然的T1 < T0 + pollDuration
。现在,了解接下来会发生什么的关键是了解 所施加的“最后期限”从fetch.max.wait.ms
时间 T0 开始计算。因此,我们有两个选择:
T1 < T0 + fetch.max.wait.ms
- 您将继续等待一段时间,此时T0 + fetch.max.wait.ms
poll 方法将返回少量数据。请注意,根据我之前的假设,这次是在 之前T0 + pollDuration
。T1 > T0 + fetch.max.wait.ms
- 在这种情况下,Kafka 认为不需要再等待,该poll()
方法将在时间 T1 返回少量数据。现在,回到您的用例。由于fetch.max.wait.ms
大于pollDuration
,您将看到一些奇怪的行为。这是一种可能的用例:
poll()
返回并且您消耗少量数据。请注意,如果fetch.max.wait.ms
比 then 足够大,pollDuration
您可能会看到多次轮询迭代,但没有获取任何数据,直到最终获取它。在您的示例中,您将轮询两次,总共 400 毫秒并空手返回,然后在第三次轮询尝试时,您将在 100 毫秒后获取数据。
从文档中如果没有提供足够的数据,服务器将被阻塞fetch.min.bytes
。500
因此,在您的情况下,如果没有足够的数据,服务器将等待毫秒
即将poll
公开 ConsumerRecords 轮询(长时间超时)根据KafkaConsumer
文档,由于没有足够的数据,消费者轮询将在每200
毫秒为空,直到代理拥有足够的数据
timeout - 如果缓冲区中没有数据可用,则在轮询中等待所花费的时间(以毫秒为单位)。如果为 0,则立即返回缓冲区中当前可用的任何记录,否则返回空。一定不能为负数。
fetch.max.wait.ms
如果没有足够的数据来立即满足 fetch.min.bytes 给出的要求,则服务器在应答提取请求之前将阻塞的最长时间。
获取最小字节数
服务器应为获取请求返回的最小数据量。如果可用数据不足,请求将等待积累足够多的数据,然后再答复请求。默认设置为 1 字节,意味着只要有一个字节的数据可用,或者获取请求在等待数据到达时超时,就会立即应答获取请求。将其设置为大于 1 将导致服务器等待大量数据的积累,这可以稍微提高服务器吞吐量,但会带来一些额外的延迟。
归档时间: |
|
查看次数: |
4000 次 |
最近记录: |