Jim*_*mmy 3 java spring spring-integration
我正在学习如何使用 Java Spring 框架并开始尝试 Spring Integration。我正在尝试使用 Spring Integration 将我的应用程序连接到 MQTT 代理以发布和订阅消息,但我无法找到手动将消息发布到出站通道的方法。如果可能的话,我想专门使用java代码中的符号来构建它,而不是使用定义bean和其他相关配置的xml文件。
在每个示例中,我看到手动发布消息的解决方案似乎是使用 MessagingGateway 接口,然后使用 SpringApplicationBuilder 获取 ConfigurableApplicationContext 以获取对 main 方法中网关接口的引用。然后使用该引用来发布消息。是否可以使用 AutoWired 作为接口?在我的尝试中,我只得到一个空指针。
我的目标是构建一个游戏,我订阅一个主题来获取游戏消息,然后每当用户准备好进行下一步操作时,就向该主题发布一条新消息。
更新:这是我一直在研究如何设置出站通道的示例之一:https://docs.spring.io/spring-integration/reference/html/mqtt.html
Gary Russel 回答后更新 2:
这是我在查看示例后编写的一些示例代码,当在 Controller.java 中运行 gateway.sendToMqtt 时使用 @AutoWired 作为网关时,我会得到一个 NullPointer 。我在这里想要实现的是当控制器处理 GET 请求时手动发送 mqtt 消息。
应用程序.java
@SpringBootApplication
public class Application {
public static void main(String[] args){
SpringApplication.run(Application.class, args);
}
}
Run Code Online (Sandbox Code Playgroud)
控制器.java
@RestController
@RequestMapping("/publishMessage")
public class Controller {
@Autowired
static Gateway gateway;
@RequestMapping(method = RequestMethod.GET)
public int request(){
gateway.sendToMqtt("Test Message!");
return 0;
}
}
Run Code Online (Sandbox Code Playgroud)
MqttPublisher.java
@EnableIntegration
@Configuration
public class MqttPublisher {
@Bean
public MqttPahoClientFactory mqttClientFactory(){
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setServerURIs("tcp://localhost:1883");
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound(){
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler("clientPublisher", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic("topic");
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel(){
return new DirectChannel();
}
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface Gateway {
void sendToMqtt(String data);
}
}
Run Code Online (Sandbox Code Playgroud)
更新:
不确定这是否是正确的日志记录,但这是我通过添加得到的:
logging.level.org.springframework.web=Debug
logging.level.org.hibernate=Error
Run Code Online (Sandbox Code Playgroud)
到应用程序属性。
使用消息传递网关或仅向通道发送消息。
编辑
@SpringBootApplication
public class So47846492Application {
public static void main(String[] args) {
SpringApplication.run(So47846492Application.class, args).close();
}
@Bean
public ApplicationRunner runner(MyGate gate) {
return args -> {
gate.send("someTopic", "foo");
Thread.sleep(5_000);
};
}
@Bean
@ServiceActivator(inputChannel = "toMqtt")
public MqttPahoMessageHandler mqtt() {
MqttPahoMessageHandler handler = new MqttPahoMessageHandler("tcp://localhost:1883", "foo",
clientFactory());
handler.setDefaultTopic("myTopic");
handler.setQosExpressionString("1");
return handler;
}
@Bean
public MqttPahoClientFactory clientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setUserName("guest");
factory.setPassword("guest");
return factory;
}
@Bean
public MqttPahoMessageDrivenChannelAdapter mqttIn() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "bar", "someTopic");
adapter.setOutputChannelName("fromMqtt");
return adapter;
}
@ServiceActivator(inputChannel = "fromMqtt")
public void in(String in) {
System.out.println(in);
}
@MessagingGateway(defaultRequestChannel = "toMqtt")
public interface MyGate {
void send(@Header(MqttHeaders.TOPIC) String topic, String out);
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
7982 次 |
| 最近记录: |