Aze*_*zel 5 apache-kafka kafka-producer-api
我正在使用 kafka-client 1.0.0 库中的 KafkaProducer,根据文档,该方法Future<RecordMetadata> send(ProducerRecord<K, V> record)将立即返回,但实际上,但看起来不是。该方法还调用doSend同一个类中的另一个方法(请参阅下面的代码片段),并且在该方法内部,它正在等待主题的元数据,我认为这是必要的,因为它与分区等相关。
/**
* Implementation of asynchronously send a record to a topic.
*/
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
// first make sure the metadata for the topic is available
ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
Run Code Online (Sandbox Code Playgroud)
还有其他完全异步的选择吗?这个问题为什么我希望它是完全异步的,因为如果其中的某些服务器bootstrap.servers没有响应,它将等待基于的时间max.block.ms,但我实际上并不希望它等待,而是,我只是希望它回来。
我看到它会立即返回的文档: KafkaProducer java doc
发送是异步的,一旦记录已存储在等待发送的记录缓冲区中,此方法将立即返回。这允许并行发送许多记录,而不会在每条记录之后阻塞等待响应。
您的分析是正确的 - kafka 有一个(有时)阻塞“非阻塞”API。这已经被提出过 - https://cwiki.apache.org/confluence/display/KAFKA/KIP-286%3A+ Producer.send%28%29+should+not+block +on+metadata+update - 但是从来没有优先考虑。
它尽可能地异步。Kafka 维护一个元数据缓存,该缓存偶尔会更新以保持最新状态,在您的场景中,您仅在该缓存过时或未初始化时等待。一旦缓存初始化,就无需等待。
如果您的代码有一个即将执行的 send() 必须尽快执行,您可以尝试向生产者发送准备性的partitionsFor() 方法调用,以查看是否无法在需要时强制更新缓存。
除此之外,总会有潜在的、偶尔的等待元数据缓存刷新的情况。
| 归档时间: |
|
| 查看次数: |
9717 次 |
| 最近记录: |