Spring Boot嵌入式HornetQ集群不转发消息

ci_*_*ci_ 32 hornetq spring-boot

我正在尝试使用嵌入式HornetQ服务器创建两个Spring Boot应用程序的静态集群.一个应用程序/服务器将处理外部事件并生成要发送到消息队列的消息.另一个应用程序/服务器将侦听消息队列并处理传入消息.由于两个应用程序之间的链接不可靠,因此每个应用程序将仅使用本地/ inVM客户端在其各自的服务器上生成/使用消息,并依赖于群集功能将消息转发到群集中其他服务器上的队列.

我正在使用HornetQConfigurationCustomizer自定义嵌入式HornetQ服务器,因为默认情况下它只附带一个InVMConnectorFactory.

我创建了几个说明此设置的示例应用程序,在此示例中,"ServerSend"指的是将生成消息的服务器,"ServerReceive"指的是将消耗消息的服务器.

两个应用程序的pom.xml包含:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-hornetq</artifactId>
</dependency>
<dependency>
    <groupId>org.hornetq</groupId>
    <artifactId>hornetq-jms-server</artifactId>
</dependency>
Run Code Online (Sandbox Code Playgroud)

DemoHornetqServerSendApplication:

@SpringBootApplication
@EnableScheduling
public class DemoHornetqServerSendApplication {
    @Autowired
    private JmsTemplate jmsTemplate;
    private @Value("${spring.hornetq.embedded.queues}") String testQueue;

    public static void main(String[] args) {
        SpringApplication.run(DemoHornetqServerSendApplication.class, args);
    }

    @Scheduled(fixedRate = 5000)
    private void sendMessage() {
        String message = "Timestamp from Server: " + System.currentTimeMillis();
        System.out.println("Sending message: " + message);
        jmsTemplate.convertAndSend(testQueue, message);
    }

    @Bean
    public HornetQConfigurationCustomizer hornetCustomizer() {
        return new HornetQConfigurationCustomizer() {

            @Override
            public void customize(Configuration configuration) {
                String serverSendConnectorName = "server-send-connector";
                String serverReceiveConnectorName = "server-receive-connector";

                Map<String, TransportConfiguration> connectorConf = configuration.getConnectorConfigurations();

                Map<String, Object> params = new HashMap<String, Object>();
                params.put(TransportConstants.HOST_PROP_NAME, "localhost");
                params.put(TransportConstants.PORT_PROP_NAME, "5445");
                TransportConfiguration tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params);
                connectorConf.put(serverSendConnectorName, tc);

                Set<TransportConfiguration> acceptors = configuration.getAcceptorConfigurations();
                tc = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
                acceptors.add(tc);

                params = new HashMap<String, Object>();
                params.put(TransportConstants.HOST_PROP_NAME, "localhost");
                params.put(TransportConstants.PORT_PROP_NAME, "5446");
                tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params);
                connectorConf.put(serverReceiveConnectorName, tc);

                List<String> staticConnectors = new ArrayList<String>();
                staticConnectors.add(serverReceiveConnectorName);
                ClusterConnectionConfiguration conf = new ClusterConnectionConfiguration(
                        "my-cluster", // name
                        "jms", // address
                        serverSendConnectorName, // connector name
                        500, // retry interval
                        true, // duplicate detection
                        true, // forward when no consumers
                        1, // max hops
                        1000000, // confirmation window size
                        staticConnectors, 
                        true // allow direct connections only
                        );
                configuration.getClusterConfigurations().add(conf);

                AddressSettings setting = new AddressSettings();
                setting.setRedistributionDelay(0);
                configuration.getAddressesSettings().put("#", setting);
            }
        };
    }
}
Run Code Online (Sandbox Code Playgroud)

application.properties(ServerSend):

spring.hornetq.mode=embedded
spring.hornetq.embedded.enabled=true
spring.hornetq.embedded.queues=jms.testqueue
spring.hornetq.embedded.cluster-password=password
Run Code Online (Sandbox Code Playgroud)

DemoHornetqServerReceiveApplication:

@SpringBootApplication
@EnableJms
public class DemoHornetqServerReceiveApplication {
    @Autowired
    private JmsTemplate jmsTemplate;
    private @Value("${spring.hornetq.embedded.queues}") String testQueue;

    public static void main(String[] args) {
        SpringApplication.run(DemoHornetqServerReceiveApplication.class, args);
    }

    @JmsListener(destination="${spring.hornetq.embedded.queues}")
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
    }

    @Bean
    public HornetQConfigurationCustomizer hornetCustomizer() {
        return new HornetQConfigurationCustomizer() {

            @Override
            public void customize(Configuration configuration) {
                String serverSendConnectorName = "server-send-connector";
                String serverReceiveConnectorName = "server-receive-connector";

                Map<String, TransportConfiguration> connectorConf = configuration.getConnectorConfigurations();

                Map<String, Object> params = new HashMap<String, Object>();
                params.put(TransportConstants.HOST_PROP_NAME, "localhost");
                params.put(TransportConstants.PORT_PROP_NAME, "5446");
                TransportConfiguration tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params);
                connectorConf.put(serverReceiveConnectorName, tc);

                Set<TransportConfiguration> acceptors = configuration.getAcceptorConfigurations();
                tc = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
                acceptors.add(tc);

                params = new HashMap<String, Object>();
                params.put(TransportConstants.HOST_PROP_NAME, "localhost");
                params.put(TransportConstants.PORT_PROP_NAME, "5445");
                tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params);
                connectorConf.put(serverSendConnectorName, tc);

                List<String> staticConnectors = new ArrayList<String>();
                staticConnectors.add(serverSendConnectorName);
                ClusterConnectionConfiguration conf = new ClusterConnectionConfiguration(
                        "my-cluster", // name
                        "jms", // address
                        serverReceiveConnectorName, // connector name
                        500, // retry interval
                        true, // duplicate detection
                        true, // forward when no consumers
                        1, // max hops
                        1000000, // confirmation window size
                        staticConnectors, 
                        true // allow direct connections only
                        );
                configuration.getClusterConfigurations().add(conf);

                AddressSettings setting = new AddressSettings();
                setting.setRedistributionDelay(0);
                configuration.getAddressesSettings().put("#", setting);
            }
        };
    }
}
Run Code Online (Sandbox Code Playgroud)

application.properties(ServerReceive):

spring.hornetq.mode=embedded
spring.hornetq.embedded.enabled=true
spring.hornetq.embedded.queues=jms.testqueue
spring.hornetq.embedded.cluster-password=password
Run Code Online (Sandbox Code Playgroud)

启动两个应用程序后,日志输出显示如下:

ServerSend:

2015-04-09 11:11:58.471 INFO 7536 --- [main] org.hornetq.core.server:HQ221000:实时服务器以配置HornetQ Configuration开始(clustered = true,backup = false,sharedStore = true,journalDirectory = C:\ Users****\AppData\Local\Temp\hornetq-data/journal,bindingsDirectory = data/bindings,largeMes​​sagesDirectory = data/largemessages,pagingDirectory = data/paging)
2015-04-09 11:11:58.501 INFO 7536 --- [main] org.hornetq.core.server:HQ221045:libaio不可用,将配置切换到NIO
2015-04-09 11:11:58.595 INFO 7536 --- [main] org.hornetq. core.server:HQ221043:添加协议支持CORE
2015-04-09 11:11:58.720 INFO 7536 --- [main] org.hornetq.core.server:HQ221003:尝试部署队列jms.queue.jms.testqueue
2015 -04-09 11:11:59.568 INFO 7536 --- [main] org.hornetq.core.server:HQ221020:已启动Netty Acceptor版本4.0.13.Final localhost:5445
2015-04-09 11:11:59.593 INFO 7536 --- [主要] org.hornetq.core.server:HQ221007:服务器现已上线
2015-04- 09 11:11:59.593 INFO 7536 --- [主要] org.hornetq.core.server:HQ221001:HarnetQ Server版本2.4.5.FINAL(Wild Hornet,124)[c139929d-d90f-11e4-ba2e-e58abf5d6944]

ServerReceive:

2015-04-09 11:12:04.401 INFO 4528 --- [main] org.hornetq.core.server:HQ221000:实时服务器以配置HornetQ Configuration开始(clustered = true,backup = false,sharedStore = true,journalDirectory = C:\ Users****\AppData\Local\Temp\hornetq-data/journal,bindingsDirectory = data/bindings,largeMes​​sagesDirectory = data/largemessages,pagingDirectory = data/paging)
2015-04-09 11:12:04.410 INFO 4528 --- [main] org.hornetq.core.server:HQ221045:libaio不可用,将配置切换为NIO
2015-04-09 11:12:04.520 INFO 4528 --- [main] org.hornetq. core.server:HQ221043:添加协议支持CORE
2015-04-09 11:12:04.629 INFO 4528 --- [main] org.hornetq.core.server:HQ221003:尝试部署队列jms.queue.jms.testqueue
2015 -04-09 11:12:05.545 INFO 4528 --- [main] org.hornetq.core.server:HQ221020:已启动Netty Acceptor版本4.0.13.Final localhost:5446
2015-04-09 11:12:05.578 INFO 4528 --- [主要] org.hornetq.core.server:HQ221007:服务器现已上线
2015-04- 09 11:12:05.578 INFO 4528 --- [主要] org.hornetq.core.server:HQ221001:HarnetQ Server版本2.4.5.FINAL(Wild Hornet,124)[c139929d-d90f-11e4-ba2e-e58abf5d6944]

clustered=true在两个输出中看到,这将显示false我是否从中删除了群集配置HornetQConfigurationCustomizer,因此它必须有一些效果.

现在,ServerSend在控制台输出中显示:

发送消息:服务器的时间戳:1428574324910
发送消息:服务器的时间戳:1428574329899
发送消息:服务器的时间戳:1428574334904

但是,ServerReceive没有显示任何内容.

似乎消息不会从ServerSend转发到ServerReceive.

我做了一些测试,通过创建另外两个Spring Boot应用程序(ClientSend和ClientReceive),它们没有嵌入HornetQ服务器,而是连接到"本机"服务器.

两个客户端应用程序的pom.xml包含:

2015-04-09 11:11:58.471  INFO 7536 --- [           main] org.hornetq.core.server                  : HQ221000: live server is starting with configuration HornetQ Configuration (clustered=true,backup=false,sharedStore=true,journalDirectory=C:\Users\****\AppData\Local\Temp\hornetq-data/journal,bindingsDirectory=data/bindings,largeMessagesDirectory=data/largemessages,pagingDirectory=data/paging)  
2015-04-09 11:11:58.501  INFO 7536 --- [           main] org.hornetq.core.server                  : HQ221045: libaio is not available, switching the configuration into NIO  
2015-04-09 11:11:58.595  INFO 7536 --- [           main] org.hornetq.core.server                  : HQ221043: Adding protocol support CORE  
2015-04-09 11:11:58.720  INFO 7536 --- [           main] org.hornetq.core.server                  : HQ221003: trying to deploy queue jms.queue.jms.testqueue  
2015-04-09 11:11:59.568  INFO 7536 --- [           main] org.hornetq.core.server                  : HQ221020: Started Netty Acceptor version 4.0.13.Final localhost:5445  
2015-04-09 11:11:59.593  INFO 7536 --- [           main] org.hornetq.core.server                  : HQ221007: Server is now live  
2015-04-09 11:11:59.593  INFO 7536 --- [           main] org.hornetq.core.server                  : HQ221001: HornetQ Server version 2.4.5.FINAL (Wild Hornet, 124)   [c139929d-d90f-11e4-ba2e-e58abf5d6944] 
Run Code Online (Sandbox Code Playgroud)

DemoHornetqClientSendApplication:

2015-04-09 11:12:04.401  INFO 4528 --- [           main] org.hornetq.core.server                  : HQ221000: live server is starting with configuration HornetQ Configuration (clustered=true,backup=false,sharedStore=true,journalDirectory=C:\Users\****\AppData\Local\Temp\hornetq-data/journal,bindingsDirectory=data/bindings,largeMessagesDirectory=data/largemessages,pagingDirectory=data/paging)  
2015-04-09 11:12:04.410  INFO 4528 --- [           main] org.hornetq.core.server                  : HQ221045: libaio is not available, switching the configuration into NIO  
2015-04-09 11:12:04.520  INFO 4528 --- [           main] org.hornetq.core.server                  : HQ221043: Adding protocol support CORE  
2015-04-09 11:12:04.629  INFO 4528 --- [           main] org.hornetq.core.server                  : HQ221003: trying to deploy queue jms.queue.jms.testqueue  
2015-04-09 11:12:05.545  INFO 4528 --- [           main] org.hornetq.core.server                  : HQ221020: Started Netty Acceptor version 4.0.13.Final localhost:5446  
2015-04-09 11:12:05.578  INFO 4528 --- [           main] org.hornetq.core.server                  : HQ221007: Server is now live  
2015-04-09 11:12:05.578  INFO 4528 --- [           main] org.hornetq.core.server                  : HQ221001: HornetQ Server version 2.4.5.FINAL (Wild Hornet, 124)   [c139929d-d90f-11e4-ba2e-e58abf5d6944] 
Run Code Online (Sandbox Code Playgroud)

application.properties(ClientSend):

Sending message: Timestamp from Server: 1428574324910  
Sending message: Timestamp from Server: 1428574329899  
Sending message: Timestamp from Server: 1428574334904  
Run Code Online (Sandbox Code Playgroud)

DemoHornetqClientReceiveApplication:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-hornetq</artifactId>
</dependency>
Run Code Online (Sandbox Code Playgroud)

application.properties(ClientReceive):

@SpringBootApplication
@EnableScheduling
public class DemoHornetqClientSendApplication {
    @Autowired
    private JmsTemplate jmsTemplate;
    private @Value("${queue}") String testQueue;

    public static void main(String[] args) {
        SpringApplication.run(DemoHornetqClientSendApplication.class, args);
    }

    @Scheduled(fixedRate = 5000)
    private void sendMessage() {
        String message = "Timestamp from Client: " + System.currentTimeMillis();
        System.out.println("Sending message: " + message);
        jmsTemplate.convertAndSend(testQueue, message);
    }
}
Run Code Online (Sandbox Code Playgroud)

现在控制台显示:

ServerReveive:

收到的消息:来自客户端的时间戳:1428574966630
收到的消息:来自客户端的时间戳:1428574971600
收到的消息:来自客户端的时间戳:1428574976595

ClientReceive:

收到消息:服务器的时间戳:1428574969436
收到消息:服务器的时间戳:1428574974438
收到消息:服务器的时间戳:1428574979446

如果我让ServerSend运行一段时间,然后启动ClientReceive,它还会收到排队到那一点的所有消息,这样就表明这些消息不会在某个地方消失,或者从其他地方消失.

为了完整起见,我还将ClientSend指向ServerSend和ClientReceive指向ServerReceive,以查看群集和InVM客户端是否存在某些问题,但同样没有任何outout指示在ClientReceive或ServerReceive中收到任何消息.

因此,向/从每个嵌入式代理发送到直接连接的外部客户端的消息传递似乎正常,但在集群中的代理之间不会转发任何消息.

所以,在这一切之后,最重要的问题是,在集群内没有转发消息的设置有什么问题?

小智 1

http://docs.jboss.org/hornetq/2.2.5.Final/user-manual/en/html/architecture.html#d0e595

“HornetQ 核心被设计为一组简单的 POJO,因此如果您有一个应用程序需要内部消息传递功能,但您不想将其公开为 HornetQ 服务器,您可以直接实例化 HornetQ 服务器并将其嵌入到您自己的应用程序中。”

如果您嵌入它,则不会将其公开为服务器。每个容器都有一个单独的实例。这相当于启动 2 个 hornet 副本并为它们指定相同的队列名称。一个写入第一个实例上的该队列,另一个监听第二个实例上的队列。

如果您想以这种方式解耦应用程序,则需要有一个充当服务器的位置。也许,您想要集群。顺便说一句,这并不是大黄蜂特有的。你会经常发现这种模式。