Spring Integration 手动将消息发布到通道

Jim*_*mmy 3 java spring spring-integration

我正在学习如何使用 Java Spring 框架并开始尝试 Spring Integration。我正在尝试使用 Spring Integration 将我的应用程序连接到 MQTT 代理以发布和订阅消息,但我无法找到手动将消息发布到出站通道的方法。如果可能的话,我想专门使用java代码中的符号来构建它,而不是使用定义bean和其他相关配置的xml文件。

在每个示例中,我看到手动发布消息的解决方案似乎是使用 MessagingGateway 接口,然后使用 SpringApplicationBuilder 获取 ConfigurableApplicationContext 以获取对 main 方法中网关接口的引用。然后使用该引用来发布消息。是否可以使用 AutoWired 作为接口?在我的尝试中,我只得到一个空指针。

我的目标是构建一个游戏,我订阅一个主题来获取游戏消息,然后每当用户准备好进行下一步操作时,就向该主题发布一条新消息。

更新:这是我一直在研究如何设置出站通道的示例之一:https://docs.spring.io/spring-integration/reference/html/mqtt.html

Gary Russel 回答后更新 2:

这是我在查看示例后编写的一些示例代码,当在 Controller.java 中运行 gateway.sendToMqtt 时使用 @AutoWired 作为网关时,我会得到一个 NullPointer 。我在这里想要实现的是当控制器处理 GET 请求时手动发送 mqtt 消息。

应用程序.java

@SpringBootApplication
public class Application {

    public static void main(String[] args){
        SpringApplication.run(Application.class, args);
    }
}
Run Code Online (Sandbox Code Playgroud)

控制器.java

@RestController
@RequestMapping("/publishMessage")
public class Controller {

    @Autowired
    static Gateway gateway;

    @RequestMapping(method = RequestMethod.GET)
    public int request(){
        gateway.sendToMqtt("Test Message!");
        return 0;
    }
}
Run Code Online (Sandbox Code Playgroud)

MqttPublisher.java

@EnableIntegration
@Configuration
public class MqttPublisher {
    @Bean
    public MqttPahoClientFactory mqttClientFactory(){
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setServerURIs("tcp://localhost:1883");
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound(){
        MqttPahoMessageHandler messageHandler =
                new MqttPahoMessageHandler("clientPublisher", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic("topic");
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel(){
        return new DirectChannel();
    }

    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface Gateway {

        void sendToMqtt(String data);
    }
}
Run Code Online (Sandbox Code Playgroud)

更新

不确定这是否是正确的日志记录,但这是我通过添加得到的:

logging.level.org.springframework.web=Debug
logging.level.org.hibernate=Error
Run Code Online (Sandbox Code Playgroud)

到应用程序属性。

https://hastebin.com/cuvonufeco.hs

Gar*_*ell 6

使用消息传递网关或仅向通道发送消息。

编辑

@SpringBootApplication
public class So47846492Application {

    public static void main(String[] args) {
        SpringApplication.run(So47846492Application.class, args).close();
    }

    @Bean
    public ApplicationRunner runner(MyGate gate) {
        return args -> {
            gate.send("someTopic", "foo");
            Thread.sleep(5_000);
        };
    }

    @Bean
    @ServiceActivator(inputChannel = "toMqtt")
    public MqttPahoMessageHandler mqtt() {
        MqttPahoMessageHandler handler = new MqttPahoMessageHandler("tcp://localhost:1883", "foo",
                clientFactory());
        handler.setDefaultTopic("myTopic");
        handler.setQosExpressionString("1");
        return handler;
    }

    @Bean
    public MqttPahoClientFactory clientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setUserName("guest");
        factory.setPassword("guest");
        return factory;
    }

    @Bean
    public MqttPahoMessageDrivenChannelAdapter mqttIn() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "bar", "someTopic");
        adapter.setOutputChannelName("fromMqtt");
        return adapter;
    }

    @ServiceActivator(inputChannel = "fromMqtt")
    public void in(String in) {
        System.out.println(in);
    }

    @MessagingGateway(defaultRequestChannel = "toMqtt")
    public interface MyGate {

        void send(@Header(MqttHeaders.TOPIC) String topic, String out);

    }

}
Run Code Online (Sandbox Code Playgroud)