标签: spring-messaging

使用Spring 4 Websockets声明并使用RabbitMQ

在我们当前的应用程序中,我们以这种方式使用Spring AMQP:

<rabbit:connection-factory  id="cachingConnectionFactory" 
                            username="${rabbitmq.connection.username}"
                            password="${rabbitmq.connection.password}"
                            host="${rabbitmq.connection.host}" 
                            port="${rabbitmq.connection.port}"
                            executor="rabbitmqPoolTaskExecutor"
                            requested-heartbeat="${rabbitmq.connection.requested-heartbeat}"
                            channel-cache-size="${rabbitmq.connection.channel-cache-size}"
                            virtual-host="${rabbitmq.connection.virtual-host}" />

<rabbit:admin   id="adminRabbit" 
                connection-factory="cachingConnectionFactory"
                auto-startup="true" />

<rabbit:template    id="rabbitTemplate"
                    connection-factory="cachingConnectionFactory"
                    exchange="v1.general.exchange"
                    message-converter="jsonMessageConverter" 
                    encoding="${rabbitmq.template.encoding}"
                    channel-transacted="${rabbitmq.template.channel-transacted}" />

<rabbit:queue id="v1.queue.1" name="v1.queue.1"  />
<rabbit:queue id="v1.queue.2" name="v1.queue.2" />
<rabbit:queue id="v1.queue.3" name="v1.queue.3" />

<fanout-exchange name="v1.general.exchange" xmlns="http://www.springframework.org/schema/rabbit" >
    <bindings>
        <binding queue="v1.queue.1" />
        <binding queue="v1.queue.2" />
        <binding queue="v1.queue.3" />
    </bindings>
</fanout-exchange>

<listener-container xmlns="http://www.springframework.org/schema/rabbit"
                    connection-factory="cachingConnectionFactory" 
                    message-converter="jsonMessageConverter" 
                    task-executor="rabbitmqPoolTaskExecutor"
                    auto-startup="${rabbitmq.listener-container.auto-startup}"
                    concurrency="${rabbitmq.listener-container.concurrency}"
                    channel-transacted="${rabbitmq.listener-container.channel-transacted}"
                    prefetch="${rabbitmq.listener-container.prefetch}"
                    transaction-size="${rabbitmq.listener-container.transaction-size}" >

    <listener id="v1.listener.queue.1" ref="listener1" method="handleMessage" queues="v1.queue.1" />
    <listener id="v1.listener.queue.2" ref="listener2" method="handleMessage" queues="v1.queue.2" />
    <listener id="v1.listener.queue.3" ref="listener3" …
Run Code Online (Sandbox Code Playgroud)

spring-4 spring-messaging spring-websocket

3
推荐指数
1
解决办法
5062
查看次数

如何在JmsMessagingTemplate.sendAndReceive上设置等待超时

我在MVC控制器中使用来自JmsMessagingTemplate的sendAndReceive,但是如果没有发送回复消息,它似乎永远都在等待回复。该文档指出:

返回:答复,如果由于超时等原因无法接收到消息,则可能为null。

但是我只是不知道在指定了多长时间或在何处配置超时之后。谁可以给我解释一下这个?

仅供参考:我不是在使用spring-integration,而是使用spring-messaging。

java spring spring-mvc spring-jms spring-messaging

3
推荐指数
1
解决办法
3899
查看次数

添加 STOMP 标头而不在 ChannelInterceptorAdapter 上重新创建消息

我需要向 STOMP 消息添加标头,目前它的工作方式如下,但我正在重新创建消息,是否可以只添加本机标头而不必重新创建消息以提高性能。

public class MyChannelInterceptor extends ChannelInterceptorAdapter {


    @Override
      public Message<?> preSend(Message<?> message, MessageChannel channel) {
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);

        StompCommand command = accessor.getCommand();
        if(command != null) {
            log.debug("Receiving msg {} from {}",command,accessor.getUser().getName());
            if(command == StompCommand.SEND) {
                log.debug("Adding expires header to  msg {} from {}",command,accessor.getUser().getName());
                String ttlString = accessor.getFirstNativeHeader("ttl");
                long ttl = 30000;
                try {
                    ttl = Long.parseLong(ttlString);
                } 
                catch(Exception ex) {
                    log.error("TTL header received but not in correct format {}",ttlString);
                }
                accessor.addNativeHeader("expires", Long.toString(System.currentTimeMillis() + ttl));

                return MessageBuilder.createMessage(message.getPayload(), accessor.getMessageHeaders()); …
Run Code Online (Sandbox Code Playgroud)

spring-messaging spring-websocket

3
推荐指数
1
解决办法
1888
查看次数

正确使用 LoadbalanceRSocketClient 和 Spring 的 RSocketRequester

我试图了解LoadbalanceRSocketClient SpringBoot 应用程序 ( RSocketRequester)上下文中的正确配置和使用模式。

我有两个 RSocket 服务器后端(SpringBoot、RSocket 消息传递)RSocketRequester在客户端上运行和配置,如下所示:

List<LoadbalanceTarget> servers = new ArrayList<>();
for (String url: backendUrls) {
  HttpClient httpClient = HttpClient.create()
    .baseUrl(url)
    .secure(ssl -> 
       ssl.sslContext(SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE)));
  servers.add(LoadbalanceTarget.from(url, WebsocketClientTransport.create(httpClient, url)));
}

// RSocketRequester.Builder is autowired by Spring boot
RSocketRequester requester = builder
  .setupRoute("/connect")
  .setupData("test")
  //.rsocketConnector(connector -> connector.reconnect(Retry.fixedDelay(60, Duration.ofSeconds(1))))
 .transports(Flux.just(servers), new RoundRobinLoadbalanceStrategy());   
Run Code Online (Sandbox Code Playgroud)

配置完成后,请求者将在计时器循环中重复使用,如下所示:

@Scheduled(fixedDelay = 10000, initialDelay = 1000)
public void timer() {
  requester.route("/foo").data(Data).send().block();
}
Run Code Online (Sandbox Code Playgroud)

它工作 - 客户端启动,连接到其中一台服务器并将消息推送到它。如果我终止客户端连接的服务器,客户端会在下一个计时器事件中重新连接到另一台服务器。如果我再次启动第一个服务器并杀死第二个服务器,客户端将不再连接,并且在客户端观察到以下异常:

java.util.concurrent.CancellationException: Pool is exhausted
    at io.rsocket.loadbalance.RSocketPool.select(RSocketPool.java:202) ~[rsocket-core-1.1.0.jar:na]
    at io.rsocket.loadbalance.LoadbalanceRSocketClient.lambda$fireAndForget$0(LoadbalanceRSocketClient.java:49) …
Run Code Online (Sandbox Code Playgroud)

spring-messaging rsocket rsocket-java

3
推荐指数
1
解决办法
596
查看次数

从Spring Websocket程序发送STOMP错误

我有一个Spring Websocket Stomp应用程序,它接受SUBSCRIBE请求.

在应用程序中,我有一个SUBSCRIBE的处理程序,即

 @Component
 public class SubscribeStompEventHandler implements ApplicationListener<SessionSubscribeEvent> {

    @Override
    public void onApplicationEvent(SessionSubscribeEvent event) {}
 }
Run Code Online (Sandbox Code Playgroud)

我用来验证订阅.

我会检查onApplicationEvent中的某些内容并从此函数将STOMP ERROR消息发送回客户端.

我找到了这个配方如何使用Spring WebSocket向STOMP客户端发送ERROR消息?但我需要了解如何获得outboundChannel.

我还尝试了以下代码:

   public void sendStompError(SimpMessagingTemplate simpMessagingTemplate, String sessionId, String topic, String errorMessage) {

    StompHeaderAccessor headerAccessor = StompHeaderAccessor.create(StompCommand.ERROR);
    headerAccessor.setMessage(errorMessage);
    headerAccessor.setSessionId(sessionId);
    headerAccessor.setLeaveMutable(true);
    simpMessagingTemplate.convertAndSendToUser(sessionId, topic, new byte[0], headerAccessor.getMessageHeaders());       
}
Run Code Online (Sandbox Code Playgroud)

我试过主题成为一些subsciption主题和/ queue/error主题.但是我没有看到消息传播到客户端.

在客户端,我使用:

      stompClient.connect(headers
                , function (frame) {
                    console.log("Conn OK " + url);               
                }, function (error) {                       
                    console.log("Conn NOT OK " + url + ": " …
Run Code Online (Sandbox Code Playgroud)

java spring spring-messaging spring-websocket

2
推荐指数
1
解决办法
3138
查看次数

如果在上下文中添加 AbstractSecurityWebSocketMessageBrokerConfigurer 的实现,则应用程序无法启动

我尝试为我的应用程序添加 websocket 授权。

我有以下授权相关类:

@EnableWebSecurity
@EnableGlobalMethodSecurity(prePostEnabled = true)
public class WebSecurityConfig extends WebSecurityConfigurerAdapter {

    private static final String SECURE_ADMIN_PASSWORD = "rockandroll";

    @Override
    protected void configure(HttpSecurity http) throws Exception {
        http
                .csrf().disable()
                .formLogin()
                .loginPage("/index.html")
                    .loginProcessingUrl("/login")
                    .defaultSuccessUrl("/sender.html")
                    .permitAll()
                .and()
                .logout()
                    .logoutSuccessUrl("/index.html")
                    .permitAll()
                .and()
                .authorizeRequests()
                .antMatchers("/js/**", "/lib/**", "/images/**", "/css/**", "/index.html", "/","/*.css","/webjars/**", "/*.js").permitAll()
                .antMatchers("/websocket").hasRole("ADMIN")
                .requestMatchers(EndpointRequest.toAnyEndpoint()).hasRole("ADMIN")
                .anyRequest().authenticated();

    }

    @Autowired
    public void configureGlobal(AuthenticationManagerBuilder auth) throws Exception {

        auth.authenticationProvider(new AuthenticationProvider() {

            @Override
            public boolean supports(Class<?> authentication) {
                return UsernamePasswordAuthenticationToken.class.isAssignableFrom(authentication);
            }

            @Override
            public Authentication authenticate(Authentication authentication) throws AuthenticationException …
Run Code Online (Sandbox Code Playgroud)

java spring-security spring-boot spring-messaging spring-websocket

2
推荐指数
1
解决办法
1569
查看次数

Spring Stomp @SendToUser与未经身份验证的用户无法正常工作

我正在尝试回复未经身份验证的用户使用@SendToUser.

  • 春季4.1.1

我正在使用新创建的Spring Boot应用程序,我唯一的配置是:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/stomp").withSockJS();
    }
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableSimpleBroker("/topic");
        registry.setApplicationDestinationPrefixes("/app");
        registry.setUserDestinationPrefix("/user"); 
    }
}
Run Code Online (Sandbox Code Playgroud)

控制器代码:

@MessageMapping("/search")
@SendToUser("/search")
public String search(@Payload String xxx) {
    return "TEST1234";
}
Run Code Online (Sandbox Code Playgroud)

JS:

var socket = new SockJS('/webapp/stomp');
stompClient = Stomp.over(socket);
stompClient.connect({}, function(frame) {
    stompClient.subscribe('/user/search', function(data){
        alert(data.body);
    });
});
Run Code Online (Sandbox Code Playgroud)

弹簧输出:

DEBUG  org.springframework.web.servlet.DispatcherServlet: 996 - Successfully completed request
DEBUG   o.s.w.s.handler.LoggingWebSocketHandlerDecorator:  45 - New WebSocketServerSockJsSession[id=shizav88]
DEBUG       o.s.m.simp.broker.SimpleBrokerMessageHandler: 158 - …
Run Code Online (Sandbox Code Playgroud)

spring stomp spring-mvc spring-messaging spring-websocket

1
推荐指数
1
解决办法
6321
查看次数

如何将 Spring 表单输入占位符属性值引用到消息源属性?

是否可以使用spring- 标签(spring表单标签库,用于数据绑定)中的标签(spring 标签库)从.properties消息文件中检索消息。spring:messageform:input


src/main/resources > texts.properties(文件)

testEntry=Test entry
Run Code Online (Sandbox Code Playgroud)

src/main/webapp/WIN-INF/JSP > test.jsp

如果我尝试这样做:

testEntry=Test entry
Run Code Online (Sandbox Code Playgroud)

我得到的结果<spring:message code="testEntry">是占位符而不是Test entry


如果我尝试这样做:

<%@taglib prefix='spring' uri='http://www.springframework.org/tags'%>
<%@taglib prefix='form' uri='http://www.springframework.org/tags/form'%>

<form:input path="test" placeholder='<spring:message code="testEntry">'/>
Run Code Online (Sandbox Code Playgroud)

我得到同样的结果。我明白这个问题。但是,是否有另一种方法可以在 JSP 中执行此操作,以便具有适当代码的消息显示为 spring-form 标记的属性值?

java spring jsp spring-form spring-messaging

1
推荐指数
1
解决办法
7261
查看次数

Spring:使用循环发送多条消息

假设我在 Spring XD 中有一个自定义模块(我使用 Spring XD + Spring Integration + hibernate)。该模块基本上从数据库获取一些东西(假设我使用休眠实体存储它,所以我使用一个名为“DataFromDB”的对象)。DataFromDB 是一个列表,然后我从列表中获取每个元素,然后我想使用如下方式发送它:

String payload = convertDataFromDBToJson(record);
return MessageBuilder.createMessage(payload, message.getHeaders());
Run Code Online (Sandbox Code Playgroud)
  • 问题是每次我必须发送消息时我都必须返回消息。
  • 所以我想循环 DataFromDB 列表并将每个元素作为消息发送。

有没有办法发送多条消息?

编辑:

我只是根据尝试复制该场景的评论创建了一个小示例。这就是我所拥有的:

我的变压器类:

public class TransformerClass {
public Collection<Message<?>> transformerMethod(Message<?> message) {
    List<Message<?>> messages = new ArrayList<Message<?>>();
    messages.add(new GenericMessage<>("foo"));
    messages.add(new GenericMessage<>("bar"));
    return messages;
}
Run Code Online (Sandbox Code Playgroud)

我的xml配置:

    <?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:tx="http://www.springframework.org/schema/tx"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
        http://www.springframework.org/schema/tx  http://www.springframework.org/schema/tx/spring-tx.xsd">

    <tx:annotation-driven />
    <int:channel id="input" />

    <bean id="transFormerClass" class="myModule.TransformerClass">
    </bean>

    <int:transformer input-channel="input" output-channel="output" ref="transFormerClass" method="transformerMethod"/>

    <int:channel id="output"/>

</beans> …
Run Code Online (Sandbox Code Playgroud)

java spring spring-integration spring-xd spring-messaging

1
推荐指数
1
解决办法
3471
查看次数

传输 FHIR 资源的 RSocket 与 HTTP 性能

我最近开始了一系列关于使用FHIR资源的服务到服务通信的性能调查,以确定在以下方面花费的处理时间:

  1. 有效载荷通信/交换
  2. 序列化和反序列化有效负载

在调查过程中遇到了两个我不明白的结果,因此我需要RSocket开发团队的帮助。我将详细说明结果和问题。

为了找出最快的通信方式,我分析了使用两种传输协议的三种传输方式——HTTP 和 RSocket。更准确地说 - 我已经分析和基准测试:

  1. 使用HAPI REST 服务器和 HAPI FHIR 客户端交换 FHIR 资源
  2. 通过@RestController使用RestTemplateWeb 客户端访问 Spring端点,使用 REST 通信交换字符串(序列化的 FHIR 资源)
  3. 使用RSocket消息交换 FHIR 资源

对前两种通信方法的分析产生了使用 HAPI REST 服务器交换 FHIR 资源与交换(原始)字符串有效载荷之间的巨大差异,包括将这些有效载荷反序列化为 FHIR 资源。更准确地说 - 对于大型 FHIR 资源,HAPI REST 服务器增加的开销大约是(原始)字符串通信和反序列化所带来的开销的 3-4 倍。

关于两个服务之间的 RSocket 通信 - 我尝试使用两种模式来交换 FHIR 资源:

  1. 作为原始字符串,从 FHIR 资源序列化
  2. 作为原始 FHIR 资源,让 RSocket 处理序列化和反序列化

第一种方法(使用原始字符串)产生的负载交换开销几乎与 HTTP(使用 REST)通信所产生的开销相似。更准确地说 - 通信开销比 HTTP 通信高几个百分比 (5-10%)。这让我感到惊讶,因为我认为 RSocket 通信开销将低于 HTTP …

hl7-fhir spring-boot spring-messaging hapi-fhir rsocket

-3
推荐指数
1
解决办法
759
查看次数