Spring云流MessageChannel send()总是返回true

fre*_*lys 5 spring spring-boot spring-cloud-stream spring-kafka

我正在使用 Spring 云流,我想保存消息并在 Kafka 服务器消失时重试在该主题上发布它们,但即使 Kafka/Zookeeper 服务器停止,MessageChannel send() 方法也始终返回 true。

有人可以帮忙吗?

更新 application.yml 内容:

spring:
    cloud:
        stream:
            kafka:
                binder:
                    brokers: localhost
                    zk-nodes: localhost
                    mode: raw
                bindings:
                    output:
                        producer:
                            sync: true
            bindings:
                output:
                    destination: topic-notification
                    content-type: application/json
Run Code Online (Sandbox Code Playgroud)

代码 :

@Service
public class SendToKafka {
    private Logger log = LoggerFactory.getLogger(SendToKafka.class);

    @Autowired
    Source source;

    @Autowired
    NotificationFileService notificationFileService;

    public void send(NotificationToResendDTO notification){
        try {
            CompletableFuture.supplyAsync(() -> notification)
                .thenAcceptAsync(notif -> {
                    boolean resp = source.output().send(MessageBuilder.withPayload(notif).build());
                    log.info(" ======== kafka server response === " + resp);

                    if (!resp){
                        log.info(" ======== failed to send the notification" + notification);
                        // save failed notification
                        notificationFileService.writeTofile(notification);
                    }
                }).get();
        } catch (InterruptedException | ExecutionException e) {
            log.info(" ======== failed to send the notification with exception" + notification);
            // save failed notification
            notificationFileService.writeTofile(notification);
            e.printStackTrace();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

Gar*_*ell 2

Kafka 默认是异步的;你需要设置synctrue;请参阅活页夹生产者属性

同步

生产者是否同步。

默认值:假。

  • 我已经尝试过了,但制作人仍然返回true。 (6认同)