fetch.max.wait.ms 与 poll() 方法的参数

kar*_*ani 5 apache-kafka kafka-consumer-api

在我提出问题之前,我想指出这里已经提出了一个类似的问题但尚未得到解答,因此我再次提问。请不要将此标记为重复,因为前面提到的问题没有任何答案。

我对fetch.max.wait.ms和有疑问consumer.poll(<value>)。这是我在对上述配置的研究中发现的

poll() 方法采用超时参数。这指定了 poll 需要多长时间才能返回,有或没有数据

如果您将 fetch.max.wait.ms 设置为 100 ms 并将 fetch.min.bytes 设置为 1 MB,Kafka 将收到来自消费者的 fetch 请求,并在有 1 MB 数据要返回时或在 100 之后响应数据ms,以先发生者为准。

所以我的问题是,会发生什么情况fetch.max.wait.ms=500consumer.poll(200)以及fetch.min.bytes= 500但是券商没有足够的数据以恢复为集fetch.min.bytes

sun*_*007 8

获取最小字节数

此属性允许使用者指定在获取记录时希望从代理接收的最小数据量。如果代理收到来自消费者的记录请求,但新记录的字节数少于 fetch.min.bytes,则代理将等到有更多消息可用,然后再将记录发送回消费者。

fetch.max.wait.ms

它会通知代理等待,直到有足够的数据要发送,然后再响应消费者。

示例:如果将 fetch.max.wait.ms 设置为 100 ms,将 fetch.min.bytes 设置为 1 MB,Kafka 将收到来自消费者的提取请求,并在有 1 MB 数据要返回时响应数据,或者100 毫秒后,以先发生者为准。

上面两个参数控制broker同时向consumer响应消息。

轮询(超时)

基本上,如果代理中没有数据可供使用, poll() 控制 poll() 将阻塞多长时间。

消费者端请求轮询以获取 Broker 响应的记录。它调用 fetchrecords() ,如果代理中已有可用记录且满足上述参数 fetch.min.bytes 和 fetch.max.wait.ms ,它将立即响应,否则等到给定的超时返回空,以防代理中没有可用记录。

下面解释了 KafkaConsumer 类中的 pollForfetches 方法

private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(final long timeoutMs) {
        final long startMs = time.milliseconds();
        long pollTimeout = Math.min(coordinator.timeToNextPoll(startMs), timeoutMs);

        // if data is available already, return it immediately
        final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
        if (!records.isEmpty()) {
            return records;
        }

        // send any new fetches (won't resend pending fetches)
        fetcher.sendFetches();

        // We do not want to be stuck blocking in the poll if we are missing some positions
        // since the offset lookup may be backing off after a failure

        // NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call
        // updateAssignmentMetadataIfNeeded before this method.
        if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) {
            pollTimeout = retryBackoffMs;
        }

        client.poll(pollTimeout, startMs, () -> {
            // since a fetch might be completed by the background thread, we need this poll condition
            // to ensure that we do not block unnecessarily in poll()
            return !fetcher.hasCompletedFetches();
        });

        // after the long poll, we should check whether the group needs to rebalance
        // prior to returning data so that the group can stabilize faster
        if (coordinator.rejoinNeededOrPending()) {
            return Collections.emptyMap();
        }

        return fetcher.fetchedRecords();
    }
Run Code Online (Sandbox Code Playgroud)

如果 fetch.min.bytes= 500 且 fetch.max.wait.ms=500,这意味着代理将在有 500 字节数据要返回时或 500 毫秒后响应消费者,以先发生者为准。消费者端轮询将每 200 毫秒调用一次 fetchedRecords 以接收代理提供的任何消息。

  • 假设时间线是这样的: T=0:consumer.poll(200); T=200:获取没有任何数据的返回,因为代理没有 500kB 。T=201: 消费者.poll(200); T=401:获取没有任何数据的返回,因为代理没有 500kB 。T=402: 消费者.poll(200); T=500: (fetch.max.wait.ms) &lt;这里发生了什么&gt;?即使代理拥有的数据小于 500kB,轮询也会返回吗? (2认同)

Yon*_*oni 5

在我看来,您建议的配置fetch.max.wait.ms > pollDuration不是一个健康的配置,因为它会导致奇怪的行为。我将尝试用一个例子来解释。

首先,我们假设一个健康的配置,其中fetch.max.wait.ms小于pollDuration
如果您进行轮询并且根本没有数据通过该主题,您将等到pollDuration通过然后poll()返回。如果您的主题中有大量数据,您将获取它并poll()立即返回。

有趣的用例是,如果您进行轮询并且主题中有少量数据,小于fetch.min.bytes,或者主题中根本没有数据,但数据在您轮询时到达。
该用例如下:

  • 时间T0:您开始轮询
  • 时间 T1:您收到主题中的一些字节,但还不够。

我认为这是理所当然的T1 < T0 + pollDuration。现在,了解接下来会发生什么的关键是了解 所施加的“最后期限”从fetch.max.wait.ms时间 T0 开始计算。因此,我们有两个选择:

  1. T1 < T0 + fetch.max.wait.ms- 您将继续等待一段时间,此时T0 + fetch.max.wait.mspoll 方法将返回少量数据。请注意,根据我之前的假设,这次是在 之前T0 + pollDuration
  2. T1 > T0 + fetch.max.wait.ms- 在这种情况下,Kafka 认为不需要再等待,该poll()方法将在时间 T1 返回少量数据。

现在,回到您的用例。由于fetch.max.wait.ms大于pollDuration,您将看到一些奇怪的行为。这是一种可能的用例:

  • 时间T0:你开始轮询,主题有少量数据。
  • 时间T1 = T0 + pollDuration:完成轮询而不获取数据。
  • 时间T2:您再次开始轮询。
  • 时间T3 = T0 + fetch.max.wait.ms:poll()返回并且您消耗少量数据。

请注意,如果fetch.max.wait.ms比 then 足够大,pollDuration您可能会看到多次轮询迭代,但没有获取任何数据,直到最终获取它。在您的示例中,您将轮询两次,总共 400 毫秒并空手返回,然后在第三次轮询尝试时,您将在 100 毫秒后获取数据。


Dea*_*ool 3

文档中如果没有提供足够的数据,服务器将被阻塞fetch.min.bytes500因此,在您的情况下,如果没有足够的数据,服务器将等待毫秒

即将poll 公开 ConsumerRecords 轮询(长时间超时)根据KafkaConsumer文档,由于没有足够的数据,消费者轮询将在每200毫秒为空,直到代理拥有足够的数据

timeout - 如果缓冲区中没有数据可用,则在轮询中等待所花费的时间(以毫秒为单位)。如果为 0,则立即返回缓冲区中当前可用的任何记录,否则返回空。一定不能为负数。

fetch.max.wait.ms

如果没有足够的数据来立即满足 fetch.min.bytes 给出的要求,则服务器在应答提取请求之前将阻塞的最长时间。

获取最小字节数

服务器应为获取请求返回的最小数据量。如果可用数据不足,请求将等待积累足够多的数据,然后再答复请求。默认设置为 1 字节,意味着只要有一个字节的数据可用,或者获取请求在等待数据到达时超时,就会立即应答获取请求。将其设置为大于 1 将导致服务器等待大量数据的积累,这可以稍微提高服务器吞吐量,但会带来一些额外的延迟。