kafka异步发送不是真的异步吗?

Aze*_*zel 5 apache-kafka kafka-producer-api

我正在使用 kafka-client 1.0.0 库中的 KafkaProducer,根据文档,该方法Future<RecordMetadata> send(ProducerRecord<K, V> record)将立即返回,但实际上,但看起来不是。该方法还调用doSend同一个类中的另一个方法(请参阅下面的代码片段),并且在该方法内部,它正在等待主题的元数据,我认为这是必要的,因为它与分区等相关。

/**
 * Implementation of asynchronously send a record to a topic.
 */
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
    TopicPartition tp = null;
    try {
        // first make sure the metadata for the topic is available
        ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
        long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
        Cluster cluster = clusterAndWaitTime.cluster;
Run Code Online (Sandbox Code Playgroud)

还有其他完全异步的选择吗?这个问题为什么我希望它是完全异步的,因为如果其中的某些服务器bootstrap.servers没有响应,它将等待基于的时间max.block.ms,但我实际上并不希望它等待,而是,我只是希望它回来。

我看到它会立即返回的文档: KafkaProducer java doc

发送是异步的,一旦记录已存储在等待发送的记录缓冲区中,此方法将立即返回。这允许并行发送许多记录,而不会在每条记录之后阻塞等待响应。

rad*_*dai 6

您的分析是正确的 - kafka 有一个(有时)阻塞“非阻塞”API。这已经被提出过 - https://cwiki.apache.org/confluence/display/KAFKA/KIP-286%3A+ Producer.send%28%29+should+not+block +on+metadata+update - 但是从来没有优先考虑。


Chr*_*ken 1

它尽可能地异步。Kafka 维护一个元数据缓存,该缓存偶尔会更新以保持最新状态,在您的场景中,您仅在该缓存过时或未初始化时等待。一旦缓存初始化,就无需等待。

如果您的代码有一个即将执行的 send() 必须尽快执行,您可以尝试向生产者发送准备性的partitionsFor() 方法调用,以查看是否无法在需要时强制更新缓存。

除此之外,总会有潜在的、偶尔的等待元数据缓存刷新的情况。