Spring boot - 如何使用故障转移 URL 连接到外部 ActiveMQ 主/从集群

Kar*_*ert 6 activemq-classic stomp message-queue cluster-computing spring-boot

我们在不同的 VM 上有 2 个 ActiveMQ 节点(例如主机:amq1、amq2)。它们作为主/从集群链接。

我们想使用故障转移协议连接到这个集群。如何才能做到这一点?春季启动配置:

@Configuration
@EnableWebSocketMessageBroker
public class WebsocketConfiguration extends AbstractWebSocketMessageBrokerConfigurer {

  @Override
  public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableStompBrokerRelay("/topic")
        .setRelayHost(activeMQProperties.getRelayHost())
        .setRelayPort(activeMQProperties.getRelayPort());
  }
}
Run Code Online (Sandbox Code Playgroud)

不幸的是,这里我们只能设置一台主机和一个端口。我们怎么能设置这样的东西:

failover:(stomp://amq1:61613,stomp://amq2:61613)
Run Code Online (Sandbox Code Playgroud)

更新:目前使用的是 Spring Boot 2.3.5

Kar*_*ert 1

我已经尝试过您在连接字符串中使用“故障转移”提到的选项,但它不起作用,并且发现一些线程甚至不支持 stomp。

所以最终的解决方案看起来像是一个自己的实现:两个具有主从配置的 ActiveMQ 服务器。

Spring配置(重要部分):

@Configuration
@EnableWebSocketMessageBroker
public class WebsocketConfiguration implements WebSocketMessageBrokerConfigurer {
    private final Logger LOGGER = LoggerFactory.getLogger(WebsocketConfiguration.class);
    
    private final ActiveMQProperties activeMQProperties;
    
    // used by own round-robin implementation to connect to the current master ActiveMQ
    private int index = 0;
    
    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableStompBrokerRelay("/topic")
            .setSystemHeartbeatReceiveInterval(STOMP_SYSTEM_HEARTBEAT_INTERVAL)
            .setSystemHeartbeatSendInterval(STOMP_SYSTEM_HEARTBEAT_INTERVAL)
            .setTcpClient(createTcpClient());
    }
    
    private ReactorNettyTcpClient<byte[]> createTcpClient() {
        return new ReactorNettyTcpClient<>(
            client -> client.remoteAddress(socketAddressSupplier()),
            new StompReactorNettyCodec());
    }

    private Supplier<? extends SocketAddress> socketAddressSupplier() {

        Supplier<? extends SocketAddress> socketAddressSupplier = () -> {
            index++;
            if (index >= activeMQProperties.getActiveMQServerList().size()) {
                index = 0;
            }
            return new InetSocketAddress(activeMQProperties.getActiveMQServerList().get(index).getRelayHost(),
                activeMQProperties.getActiveMQServerList().get(index).getRelayPort());
        };

        return socketAddressSupplier;
    }
}
Run Code Online (Sandbox Code Playgroud)

ActiveMQ 属性:

activemq:                              
    activeMQServerList:
      -
        relayHost: host1
        relayPort: 61613
      -
        relayHost: host2
        relayPort: 61613
Run Code Online (Sandbox Code Playgroud)

诀窍在于供应商。当主 ActiveMQ 出现故障时,供应商将从列表中返回下一个配置的服务器并重新连接到该服务器。

它工作正常。