Apache Kafka Producer配置:'request.timeout.ms'VS.'max.block.ms'属性

rug*_*den 7 apache-kafka kafka-producer-api

鉴于以下同步kafka生产者

Properties props = new Properties();
props.put("max.block.ms", 30000);
props.put("request.timeout.ms", 30000);
props.put("retries", 5);

KafkaProducer<String, byte[]> produce = new KafkaProducer<>(props);

//Send message
producer.send(producerRecord).get();
Run Code Online (Sandbox Code Playgroud)

帮助我理解request.timeout.msmax.block.ms生产者配置之间的区别.是否包括所有重试的最长时间?或者每次重试都有自己的超时?

Eug*_*ene 11

我发现接受的答案有点“薄弱”,所以这可能会对其他人有所帮助。

您的代码中有两件重要的事情:

KafkaProducer<String, byte[]> produce = new KafkaProducer<>(props);

//Send message
producer.send(producerRecord).get();
Run Code Online (Sandbox Code Playgroud)

producer.send(producerRecord)- 由两部分组成:阻塞和非阻塞。阻塞部分是由一些“部分”组成的:

- Request metadata about brokers from zookeeper 
- Serialize message
- Choose a Partition 
- Place Message in the RecordAccumulator
Run Code Online (Sandbox Code Playgroud)

现在,通常前三个步骤很快(第一个步骤在初始调用后缓存),而第四个步骤可能需要时间。发生这种情况是因为RecordAccumulator空间有限 ( buffer.memory),而此时有多少空间取决于生产者客户端中的另一个Sender Thread线程(称为)。如果这个线程做得不好(从RecordAccumulator代理检索消息并向代理发送消息;顺便说一句,所有这些都有指标),您的send方法将被阻塞(中没有空间RecordAccumulator),直到空间可用。

所有这 4 个步骤最多允许被阻止max.block.ms。这就是 KIP 在谈论时的含义:

  • 元数据获取时间(从 Zookeeper 获取有关代理的元数据)
  • 缓冲区满块时间(我所说的时间)
  • 序列化时间(定制序列化器)
  • 分区时间(定制分区器)

还有delivery.timeout.ms. 这是消息发送到分区之前等待的总时间,它包括:将记录推送到批次的时间(在RecordAccumulator)+ 获取 ack 的时间(例如all等待消息跨副本复制)+将消息发送到代理的时间,包括所有重试(如果有)。

您可以将其视为从send方法向下到到达所有副本并发送回 ack 所需的时间。所有这一切时间都必须低于delivery.timeout.ms


在解释之前request.timeout.ms,恕我直言,了解它们max.in.flight.requests.per.connection是什么很重要,因为它们之间有一些联系。假设一个批次已准备好从 发送到代理RecordAccumulator(因为它batch.sizelinger.ms已完成)。该批次是否由所谓的“发送者线程”(客户端本身的线程,并且是!=调用send方法的线程)获取或不由 定义max.in.flight.requests.per.connection

您可以在任何时间点有任意数量的max.in.flight.requests.per.connection 并发请求处于活动状态。一个稍微容易一点的思考方式是这样的。“发送者线程”有一个它不断执行的特定循环,用伪代码表示:

while(true) {
    // check if there are batches to send
    // get the batches to send to the brokers
    // make requests to the broker
    // poll connections
    // etc
}
Run Code Online (Sandbox Code Playgroud)

假设这是第一批发送。“Sender Thread”递增max.in.flight.requests.per.connection,使其变为1;获取批次并将其发送给经纪人。此时它不会等待确认,而是返回到循环。依此类推,直到达到 5( 的默认值max.in.flight.requests.per.connection)。

现在假设有批次要发送到代理,发送者线程不会接受,因为它没有可用的请求(我们现在最多 5 个)。相反,它会“轮询连接”:它会向代理询问先前发送的结果,其余的解释在这里


有了所有这些背景,是时候看看 了request.timeout.ms,这实际上很容易。当客户端轮询连接时 - 尝试从代理获取每个正在进行的请求的响应,它可以在request.timeout.ms(默认情况下为 30 秒)内完成此操作。如果我们重试,该值将被重置。


Par*_*esh 9

request.timeout.ms 用于超时请求,我会将其设置为我可以等待响应的最长时间。

max.block.ms 用于生产者阻塞缓冲时间、序列化时间等。

有关详细信息,请查看这一点。 https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient