Spring Kafka异步发送调用块

Car*_*sto 11 spring asynchronous send producer apache-kafka

我正在使用Spring-Kafka版本1.2.1,当Kafka服务器关闭/无法访问时,异步发送调用阻塞一段时间.它似乎是TCP超时.代码是这样的:

ListenableFuture<SendResult<K, V>> future = kafkaTemplate.send(topic, key, message);
future.addCallback(new ListenableFutureCallback<SendResult<K, V>>() {
    @Override
    public void onSuccess(SendResult<K, V> result) {
        ...
    }

    @Override
    public void onFailure(Throwable ex) {
        ...
    }
});
Run Code Online (Sandbox Code Playgroud)

我已经快速浏览了Spring-Kafka代码,它似乎只是将任务传递给kafka客户端库,将回调交互转换为未来的对象交互.看看kafka客户端库,代码变得更加复杂,我没有花时间去理解它,但我想它可能在同一个线程中进行远程调用(元数据,至少?).

作为用户,我期望返回未来的Spring-Kafka方法立即返回,即使远程kafka服务器无法访问.

如果我的理解是错误的,或者如果这是一个错误,任何确认将是受欢迎的.我现在最终将它变为异步.

另一个问题是Spring-Kafka文档在开始时说它提供了同步和异步发送方法.我找不到任何不返回期货的方法,也许文档需要更新.

如果需要,我很乐意提供任何进一步的细节.谢谢.

Jor*_*e C 7

除了配置类上的@EnableAsync注释之外,如果您调用此代码,则需要在方法上使用@Async注释.

http://www.baeldung.com/spring-async

这里有一些代码片段.Kafka制作人配置:

@EnableAsync
@Configuration
public class KafkaProducerConfig {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerConfig.class);

    @Value("${kafka.brokers}")
    private String servers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, GenericMessage> producerFactory(ObjectMapper objectMapper) {
        return new DefaultKafkaProducerFactory<>(producerConfigs(), new StringSerializer(), new JsonSerializer(objectMapper));
    }

    @Bean
    public KafkaTemplate<String, GenericMessage> kafkaTemplate(ObjectMapper objectMapper) {
        return new KafkaTemplate<String, GenericMessage>(producerFactory(objectMapper));
    }

    @Bean
    public Producer producer() {
        return new Producer();
    }
}
Run Code Online (Sandbox Code Playgroud)

而制作人本身:

public class Producer {

    public static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);

    @Autowired
    private KafkaTemplate<String, GenericMessage> kafkaTemplate;

    @Async
    public void send(String topic, GenericMessage message) {
        ListenableFuture<SendResult<String, GenericMessage>> future = kafkaTemplate.send(topic, message);
        future.addCallback(new ListenableFutureCallback<SendResult<String, GenericMessage>>() {

            @Override
            public void onSuccess(final SendResult<String, GenericMessage> message) {
                LOGGER.info("sent message= " + message + " with offset= " + message.getRecordMetadata().offset());
            }

            @Override
            public void onFailure(final Throwable throwable) {
                LOGGER.error("unable to send message= " + message, throwable);
            }
        });
    }
}
Run Code Online (Sandbox Code Playgroud)

  • 我同意你的观点,对我来说,你提供 future 没有意义,但无论如何我都必须放置注释。在我们的例子中,放置这两个注释使其工作起来就像一个魅力。我将编辑响应并添加一些代码片段。 (2认同)

Gee*_*nte 7

如果我查看 KafkaProducer 本身,发送消息有两个部分:

  1. 将消息存储到内部缓冲区中。
  2. 将消息从缓冲区上传到 Kafka。

KafkaProducer 对于第二部分是异步的,而不是第一部分。

send() 方法仍然可以在第一部分被阻塞并最终抛出 TimeoutExceptions,例如:

  • 主题的元数据没有被缓存或陈旧,因此生产者尝试从服务器获取元数据以了解主题是否仍然存在以及它有多少分区。
  • 缓冲区已满(默认为 32MB)。

如果服务器完全没有响应,您可能会遇到这两个问题。

更新:

我在 Kafka 2.2.1 中测试并确认了这一点。看起来这种行为在 2.4 和/或 2.6 中可能有所不同:KAFKA-3720