rug*_*den 7 apache-kafka kafka-producer-api
鉴于以下同步kafka生产者
Properties props = new Properties();
props.put("max.block.ms", 30000);
props.put("request.timeout.ms", 30000);
props.put("retries", 5);
KafkaProducer<String, byte[]> produce = new KafkaProducer<>(props);
//Send message
producer.send(producerRecord).get();
Run Code Online (Sandbox Code Playgroud)
帮助我理解request.timeout.ms和max.block.ms生产者配置之间的区别.是否包括所有重试的最长时间?或者每次重试都有自己的超时?
Eug*_*ene 11
我发现接受的答案有点“薄弱”,所以这可能会对其他人有所帮助。
您的代码中有两件重要的事情:
KafkaProducer<String, byte[]> produce = new KafkaProducer<>(props);
//Send message
producer.send(producerRecord).get();
Run Code Online (Sandbox Code Playgroud)
producer.send(producerRecord)- 由两部分组成:阻塞和非阻塞。阻塞部分是由一些“部分”组成的:
- Request metadata about brokers from zookeeper
- Serialize message
- Choose a Partition
- Place Message in the RecordAccumulator
Run Code Online (Sandbox Code Playgroud)
现在,通常前三个步骤很快(第一个步骤在初始调用后缓存),而第四个步骤可能需要时间。发生这种情况是因为RecordAccumulator空间有限 ( buffer.memory),而此时有多少空间取决于生产者客户端中的另一个Sender Thread线程(称为)。如果这个线程做得不好(从RecordAccumulator代理检索消息并向代理发送消息;顺便说一句,所有这些都有指标),您的send方法将被阻塞(中没有空间RecordAccumulator),直到空间可用。
所有这 4 个步骤最多允许被阻止max.block.ms。这就是 KIP 在谈论时的含义:
还有delivery.timeout.ms. 这是消息发送到分区之前等待的总时间,它包括:将记录推送到批次的时间(在RecordAccumulator)+ 获取 ack 的时间(例如all等待消息跨副本复制)+将消息发送到代理的时间,包括所有重试(如果有)。
您可以将其视为从send方法向下到到达所有副本并发送回 ack 所需的时间。所有这一切时间都必须低于delivery.timeout.ms。
在解释之前request.timeout.ms,恕我直言,了解它们max.in.flight.requests.per.connection是什么很重要,因为它们之间有一些联系。假设一个批次已准备好从 发送到代理RecordAccumulator(因为它batch.size或linger.ms已完成)。该批次是否由所谓的“发送者线程”(客户端本身的线程,并且是!=调用send方法的线程)获取或不由 定义max.in.flight.requests.per.connection。
您可以在任何时间点有任意数量的max.in.flight.requests.per.connection 并发请求处于活动状态。一个稍微容易一点的思考方式是这样的。“发送者线程”有一个它不断执行的特定循环,用伪代码表示:
while(true) {
// check if there are batches to send
// get the batches to send to the brokers
// make requests to the broker
// poll connections
// etc
}
Run Code Online (Sandbox Code Playgroud)
假设这是第一批发送。“Sender Thread”递增max.in.flight.requests.per.connection,使其变为1;获取批次并将其发送给经纪人。此时它不会等待确认,而是返回到循环。依此类推,直到达到 5( 的默认值max.in.flight.requests.per.connection)。
现在假设有批次要发送到代理,发送者线程不会接受,因为它没有可用的请求(我们现在最多 5 个)。相反,它会“轮询连接”:它会向代理询问先前发送的结果,其余的解释在这里
有了所有这些背景,是时候看看 了request.timeout.ms,这实际上很容易。当客户端轮询连接时 - 尝试从代理获取每个正在进行的请求的响应,它可以在request.timeout.ms(默认情况下为 30 秒)内完成此操作。如果我们重试,该值将被重置。
request.timeout.ms 用于超时请求,我会将其设置为我可以等待响应的最长时间。
max.block.ms 用于生产者阻塞缓冲时间、序列化时间等。
有关详细信息,请查看这一点。 https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient
| 归档时间: |
|
| 查看次数: |
5170 次 |
| 最近记录: |