Kar*_*ert 6 activemq-classic stomp message-queue cluster-computing spring-boot
我们在不同的 VM 上有 2 个 ActiveMQ 节点(例如主机:amq1、amq2)。它们作为主/从集群链接。
我们想使用故障转移协议连接到这个集群。如何才能做到这一点?春季启动配置:
@Configuration
@EnableWebSocketMessageBroker
public class WebsocketConfiguration extends AbstractWebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableStompBrokerRelay("/topic")
.setRelayHost(activeMQProperties.getRelayHost())
.setRelayPort(activeMQProperties.getRelayPort());
}
}
Run Code Online (Sandbox Code Playgroud)
不幸的是,这里我们只能设置一台主机和一个端口。我们怎么能设置这样的东西:
failover:(stomp://amq1:61613,stomp://amq2:61613)
Run Code Online (Sandbox Code Playgroud)
更新:目前使用的是 Spring Boot 2.3.5
我已经尝试过您在连接字符串中使用“故障转移”提到的选项,但它不起作用,并且发现一些线程甚至不支持 stomp。
所以最终的解决方案看起来像是一个自己的实现:两个具有主从配置的 ActiveMQ 服务器。
Spring配置(重要部分):
@Configuration
@EnableWebSocketMessageBroker
public class WebsocketConfiguration implements WebSocketMessageBrokerConfigurer {
private final Logger LOGGER = LoggerFactory.getLogger(WebsocketConfiguration.class);
private final ActiveMQProperties activeMQProperties;
// used by own round-robin implementation to connect to the current master ActiveMQ
private int index = 0;
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableStompBrokerRelay("/topic")
.setSystemHeartbeatReceiveInterval(STOMP_SYSTEM_HEARTBEAT_INTERVAL)
.setSystemHeartbeatSendInterval(STOMP_SYSTEM_HEARTBEAT_INTERVAL)
.setTcpClient(createTcpClient());
}
private ReactorNettyTcpClient<byte[]> createTcpClient() {
return new ReactorNettyTcpClient<>(
client -> client.remoteAddress(socketAddressSupplier()),
new StompReactorNettyCodec());
}
private Supplier<? extends SocketAddress> socketAddressSupplier() {
Supplier<? extends SocketAddress> socketAddressSupplier = () -> {
index++;
if (index >= activeMQProperties.getActiveMQServerList().size()) {
index = 0;
}
return new InetSocketAddress(activeMQProperties.getActiveMQServerList().get(index).getRelayHost(),
activeMQProperties.getActiveMQServerList().get(index).getRelayPort());
};
return socketAddressSupplier;
}
}
Run Code Online (Sandbox Code Playgroud)
ActiveMQ 属性:
activemq:
activeMQServerList:
-
relayHost: host1
relayPort: 61613
-
relayHost: host2
relayPort: 61613
Run Code Online (Sandbox Code Playgroud)
诀窍在于供应商。当主 ActiveMQ 出现故障时,供应商将从列表中返回下一个配置的服务器并重新连接到该服务器。
它工作正常。
| 归档时间: |
|
| 查看次数: |
686 次 |
| 最近记录: |