fre*_*lys 5 spring spring-boot spring-cloud-stream spring-kafka
我正在使用 Spring 云流,我想保存消息并在 Kafka 服务器消失时重试在该主题上发布它们,但即使 Kafka/Zookeeper 服务器停止,MessageChannel send() 方法也始终返回 true。
有人可以帮忙吗?
更新 application.yml 内容:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost
zk-nodes: localhost
mode: raw
bindings:
output:
producer:
sync: true
bindings:
output:
destination: topic-notification
content-type: application/json
Run Code Online (Sandbox Code Playgroud)
代码 :
@Service
public class SendToKafka {
private Logger log = LoggerFactory.getLogger(SendToKafka.class);
@Autowired
Source source;
@Autowired
NotificationFileService notificationFileService;
public void send(NotificationToResendDTO notification){
try {
CompletableFuture.supplyAsync(() -> notification)
.thenAcceptAsync(notif -> {
boolean resp = source.output().send(MessageBuilder.withPayload(notif).build());
log.info(" ======== kafka server response === " + resp);
if (!resp){
log.info(" ======== failed to send the notification" + notification);
// save failed notification
notificationFileService.writeTofile(notification);
}
}).get();
} catch (InterruptedException | ExecutionException e) {
log.info(" ======== failed to send the notification with exception" + notification);
// save failed notification
notificationFileService.writeTofile(notification);
e.printStackTrace();
}
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2173 次 |
| 最近记录: |