在我们当前的应用程序中,我们以这种方式使用Spring AMQP:
<rabbit:connection-factory id="cachingConnectionFactory"
username="${rabbitmq.connection.username}"
password="${rabbitmq.connection.password}"
host="${rabbitmq.connection.host}"
port="${rabbitmq.connection.port}"
executor="rabbitmqPoolTaskExecutor"
requested-heartbeat="${rabbitmq.connection.requested-heartbeat}"
channel-cache-size="${rabbitmq.connection.channel-cache-size}"
virtual-host="${rabbitmq.connection.virtual-host}" />
<rabbit:admin id="adminRabbit"
connection-factory="cachingConnectionFactory"
auto-startup="true" />
<rabbit:template id="rabbitTemplate"
connection-factory="cachingConnectionFactory"
exchange="v1.general.exchange"
message-converter="jsonMessageConverter"
encoding="${rabbitmq.template.encoding}"
channel-transacted="${rabbitmq.template.channel-transacted}" />
<rabbit:queue id="v1.queue.1" name="v1.queue.1" />
<rabbit:queue id="v1.queue.2" name="v1.queue.2" />
<rabbit:queue id="v1.queue.3" name="v1.queue.3" />
<fanout-exchange name="v1.general.exchange" xmlns="http://www.springframework.org/schema/rabbit" >
<bindings>
<binding queue="v1.queue.1" />
<binding queue="v1.queue.2" />
<binding queue="v1.queue.3" />
</bindings>
</fanout-exchange>
<listener-container xmlns="http://www.springframework.org/schema/rabbit"
connection-factory="cachingConnectionFactory"
message-converter="jsonMessageConverter"
task-executor="rabbitmqPoolTaskExecutor"
auto-startup="${rabbitmq.listener-container.auto-startup}"
concurrency="${rabbitmq.listener-container.concurrency}"
channel-transacted="${rabbitmq.listener-container.channel-transacted}"
prefetch="${rabbitmq.listener-container.prefetch}"
transaction-size="${rabbitmq.listener-container.transaction-size}" >
<listener id="v1.listener.queue.1" ref="listener1" method="handleMessage" queues="v1.queue.1" />
<listener id="v1.listener.queue.2" ref="listener2" method="handleMessage" queues="v1.queue.2" />
<listener id="v1.listener.queue.3" ref="listener3" …Run Code Online (Sandbox Code Playgroud) 我在MVC控制器中使用来自JmsMessagingTemplate的sendAndReceive,但是如果没有发送回复消息,它似乎永远都在等待回复。该文档指出:
返回:答复,如果由于超时等原因无法接收到消息,则可能为null。
但是我只是不知道在指定了多长时间或在何处配置超时之后。谁可以给我解释一下这个?
仅供参考:我不是在使用spring-integration,而是使用spring-messaging。
我需要向 STOMP 消息添加标头,目前它的工作方式如下,但我正在重新创建消息,是否可以只添加本机标头而不必重新创建消息以提高性能。
public class MyChannelInterceptor extends ChannelInterceptorAdapter {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
StompCommand command = accessor.getCommand();
if(command != null) {
log.debug("Receiving msg {} from {}",command,accessor.getUser().getName());
if(command == StompCommand.SEND) {
log.debug("Adding expires header to msg {} from {}",command,accessor.getUser().getName());
String ttlString = accessor.getFirstNativeHeader("ttl");
long ttl = 30000;
try {
ttl = Long.parseLong(ttlString);
}
catch(Exception ex) {
log.error("TTL header received but not in correct format {}",ttlString);
}
accessor.addNativeHeader("expires", Long.toString(System.currentTimeMillis() + ttl));
return MessageBuilder.createMessage(message.getPayload(), accessor.getMessageHeaders()); …Run Code Online (Sandbox Code Playgroud) 我试图了解LoadbalanceRSocketClient SpringBoot 应用程序 ( RSocketRequester)上下文中的正确配置和使用模式。
我有两个 RSocket 服务器后端(SpringBoot、RSocket 消息传递)RSocketRequester在客户端上运行和配置,如下所示:
List<LoadbalanceTarget> servers = new ArrayList<>();
for (String url: backendUrls) {
HttpClient httpClient = HttpClient.create()
.baseUrl(url)
.secure(ssl ->
ssl.sslContext(SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE)));
servers.add(LoadbalanceTarget.from(url, WebsocketClientTransport.create(httpClient, url)));
}
// RSocketRequester.Builder is autowired by Spring boot
RSocketRequester requester = builder
.setupRoute("/connect")
.setupData("test")
//.rsocketConnector(connector -> connector.reconnect(Retry.fixedDelay(60, Duration.ofSeconds(1))))
.transports(Flux.just(servers), new RoundRobinLoadbalanceStrategy());
Run Code Online (Sandbox Code Playgroud)
配置完成后,请求者将在计时器循环中重复使用,如下所示:
@Scheduled(fixedDelay = 10000, initialDelay = 1000)
public void timer() {
requester.route("/foo").data(Data).send().block();
}
Run Code Online (Sandbox Code Playgroud)
它工作 - 客户端启动,连接到其中一台服务器并将消息推送到它。如果我终止客户端连接的服务器,客户端会在下一个计时器事件中重新连接到另一台服务器。如果我再次启动第一个服务器并杀死第二个服务器,客户端将不再连接,并且在客户端观察到以下异常:
java.util.concurrent.CancellationException: Pool is exhausted
at io.rsocket.loadbalance.RSocketPool.select(RSocketPool.java:202) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.loadbalance.LoadbalanceRSocketClient.lambda$fireAndForget$0(LoadbalanceRSocketClient.java:49) …Run Code Online (Sandbox Code Playgroud) 我有一个Spring Websocket Stomp应用程序,它接受SUBSCRIBE请求.
在应用程序中,我有一个SUBSCRIBE的处理程序,即
@Component
public class SubscribeStompEventHandler implements ApplicationListener<SessionSubscribeEvent> {
@Override
public void onApplicationEvent(SessionSubscribeEvent event) {}
}
Run Code Online (Sandbox Code Playgroud)
我用来验证订阅.
我会检查onApplicationEvent中的某些内容并从此函数将STOMP ERROR消息发送回客户端.
我找到了这个配方如何使用Spring WebSocket向STOMP客户端发送ERROR消息?但我需要了解如何获得outboundChannel.
我还尝试了以下代码:
public void sendStompError(SimpMessagingTemplate simpMessagingTemplate, String sessionId, String topic, String errorMessage) {
StompHeaderAccessor headerAccessor = StompHeaderAccessor.create(StompCommand.ERROR);
headerAccessor.setMessage(errorMessage);
headerAccessor.setSessionId(sessionId);
headerAccessor.setLeaveMutable(true);
simpMessagingTemplate.convertAndSendToUser(sessionId, topic, new byte[0], headerAccessor.getMessageHeaders());
}
Run Code Online (Sandbox Code Playgroud)
我试过主题成为一些subsciption主题和/ queue/error主题.但是我没有看到消息传播到客户端.
在客户端,我使用:
stompClient.connect(headers
, function (frame) {
console.log("Conn OK " + url);
}, function (error) {
console.log("Conn NOT OK " + url + ": " …Run Code Online (Sandbox Code Playgroud) 我尝试为我的应用程序添加 websocket 授权。
我有以下授权相关类:
@EnableWebSecurity
@EnableGlobalMethodSecurity(prePostEnabled = true)
public class WebSecurityConfig extends WebSecurityConfigurerAdapter {
private static final String SECURE_ADMIN_PASSWORD = "rockandroll";
@Override
protected void configure(HttpSecurity http) throws Exception {
http
.csrf().disable()
.formLogin()
.loginPage("/index.html")
.loginProcessingUrl("/login")
.defaultSuccessUrl("/sender.html")
.permitAll()
.and()
.logout()
.logoutSuccessUrl("/index.html")
.permitAll()
.and()
.authorizeRequests()
.antMatchers("/js/**", "/lib/**", "/images/**", "/css/**", "/index.html", "/","/*.css","/webjars/**", "/*.js").permitAll()
.antMatchers("/websocket").hasRole("ADMIN")
.requestMatchers(EndpointRequest.toAnyEndpoint()).hasRole("ADMIN")
.anyRequest().authenticated();
}
@Autowired
public void configureGlobal(AuthenticationManagerBuilder auth) throws Exception {
auth.authenticationProvider(new AuthenticationProvider() {
@Override
public boolean supports(Class<?> authentication) {
return UsernamePasswordAuthenticationToken.class.isAssignableFrom(authentication);
}
@Override
public Authentication authenticate(Authentication authentication) throws AuthenticationException …Run Code Online (Sandbox Code Playgroud) java spring-security spring-boot spring-messaging spring-websocket
我正在尝试回复未经身份验证的用户使用@SendToUser.
我正在使用新创建的Spring Boot应用程序,我唯一的配置是:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/stomp").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/topic");
registry.setApplicationDestinationPrefixes("/app");
registry.setUserDestinationPrefix("/user");
}
}
Run Code Online (Sandbox Code Playgroud)
控制器代码:
@MessageMapping("/search")
@SendToUser("/search")
public String search(@Payload String xxx) {
return "TEST1234";
}
Run Code Online (Sandbox Code Playgroud)
JS:
var socket = new SockJS('/webapp/stomp');
stompClient = Stomp.over(socket);
stompClient.connect({}, function(frame) {
stompClient.subscribe('/user/search', function(data){
alert(data.body);
});
});
Run Code Online (Sandbox Code Playgroud)
弹簧输出:
DEBUG org.springframework.web.servlet.DispatcherServlet: 996 - Successfully completed request
DEBUG o.s.w.s.handler.LoggingWebSocketHandlerDecorator: 45 - New WebSocketServerSockJsSession[id=shizav88]
DEBUG o.s.m.simp.broker.SimpleBrokerMessageHandler: 158 - …Run Code Online (Sandbox Code Playgroud) 是否可以使用spring- 标签(spring表单标签库,用于数据绑定)中的标签(spring 标签库)从.properties消息文件中检索消息。spring:messageform:input
src/main/resources > texts.properties(文件)
testEntry=Test entry
Run Code Online (Sandbox Code Playgroud)
src/main/webapp/WIN-INF/JSP > test.jsp
如果我尝试这样做:
testEntry=Test entry
Run Code Online (Sandbox Code Playgroud)
我得到的结果<spring:message code="testEntry">是占位符而不是Test entry
如果我尝试这样做:
<%@taglib prefix='spring' uri='http://www.springframework.org/tags'%>
<%@taglib prefix='form' uri='http://www.springframework.org/tags/form'%>
<form:input path="test" placeholder='<spring:message code="testEntry">'/>
Run Code Online (Sandbox Code Playgroud)
我得到同样的结果。我明白这个问题。但是,是否有另一种方法可以在 JSP 中执行此操作,以便具有适当代码的消息显示为 spring-form 标记的属性值?
假设我在 Spring XD 中有一个自定义模块(我使用 Spring XD + Spring Integration + hibernate)。该模块基本上从数据库获取一些东西(假设我使用休眠实体存储它,所以我使用一个名为“DataFromDB”的对象)。DataFromDB 是一个列表,然后我从列表中获取每个元素,然后我想使用如下方式发送它:
String payload = convertDataFromDBToJson(record);
return MessageBuilder.createMessage(payload, message.getHeaders());
Run Code Online (Sandbox Code Playgroud)
有没有办法发送多条消息?
编辑:
我只是根据尝试复制该场景的评论创建了一个小示例。这就是我所拥有的:
我的变压器类:
public class TransformerClass {
public Collection<Message<?>> transformerMethod(Message<?> message) {
List<Message<?>> messages = new ArrayList<Message<?>>();
messages.add(new GenericMessage<>("foo"));
messages.add(new GenericMessage<>("bar"));
return messages;
}
Run Code Online (Sandbox Code Playgroud)
我的xml配置:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd">
<tx:annotation-driven />
<int:channel id="input" />
<bean id="transFormerClass" class="myModule.TransformerClass">
</bean>
<int:transformer input-channel="input" output-channel="output" ref="transFormerClass" method="transformerMethod"/>
<int:channel id="output"/>
</beans> …Run Code Online (Sandbox Code Playgroud) 我最近开始了一系列关于使用FHIR资源的服务到服务通信的性能调查,以确定在以下方面花费的处理时间:
在调查过程中遇到了两个我不明白的结果,因此我需要RSocket开发团队的帮助。我将详细说明结果和问题。
为了找出最快的通信方式,我分析了使用两种传输协议的三种传输方式——HTTP 和 RSocket。更准确地说 - 我已经分析和基准测试:
@RestController使用RestTemplateWeb 客户端访问 Spring端点,使用 REST 通信交换字符串(序列化的 FHIR 资源)对前两种通信方法的分析产生了使用 HAPI REST 服务器交换 FHIR 资源与交换(原始)字符串有效载荷之间的巨大差异,包括将这些有效载荷反序列化为 FHIR 资源。更准确地说 - 对于大型 FHIR 资源,HAPI REST 服务器增加的开销大约是(原始)字符串通信和反序列化所带来的开销的 3-4 倍。
关于两个服务之间的 RSocket 通信 - 我尝试使用两种模式来交换 FHIR 资源:
第一种方法(使用原始字符串)产生的负载交换开销几乎与 HTTP(使用 REST)通信所产生的开销相似。更准确地说 - 通信开销比 HTTP 通信高几个百分比 (5-10%)。这让我感到惊讶,因为我认为 RSocket 通信开销将远低于 HTTP …
spring-messaging ×10
java ×5
spring ×5
rsocket ×2
spring-boot ×2
spring-mvc ×2
hapi-fhir ×1
hl7-fhir ×1
jsp ×1
rsocket-java ×1
spring-4 ×1
spring-form ×1
spring-jms ×1
spring-xd ×1
stomp ×1