有一个很棒的答案,详细介绍了 REST api 的工作原理。
Websocket 如何以类似的细节工作?
websocket socket.io spring-websocket libwebsockets java-websocket
我正在尝试使用 Spring StandardWebSocketClient 建立 WebSocket 连接,但由于代理设置而出现错误,因为服务器在代理后面运行。下面是我正在使用的代码。
StandardWebSocketClient aStandardWebSocketClient=new StandardWebSocketClient();
WebSocketConnectionManager manager = new WebSocketConnectionManager(aStandardWebSocketClient, new WebSockethandler(), aWebSockURL);
Run Code Online (Sandbox Code Playgroud)
成功设置代理配置后可以进行休息呼叫。
是否有任何代理配置可用于使 StandardWebSocketClient 连接到 websocket 服务器?
SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
InetSocketAddress address = new InetSocketAddress("proxyHost",8080);
Proxy proxy = new Proxy(Proxy.Type.HTTP,address);
factory.setProxy(proxy);
restTemplate.setRequestFactory(factory);
Run Code Online (Sandbox Code Playgroud) 好吧,我几乎已经阅读了关于 spring websocket 的每一篇 stackoveflow 帖子。我想在特定客户端连接到服务器后向其发送消息,很简单!
我已经能够从客户端进行连接,并且可以在服务器端侦听新连接。但是当我尝试从服务器端发送消息时,什么也没有发生。即使我的客户已经订阅了。
下面是我的客户端js。
var stompClient = null;
$(document).ready(function() {
connect();
});
function connect() {
var socket = new SockJS('/context_path/stockfeeds');
stompClient = Stomp.over(socket);
stompClient.connect({}, function(frame) {
console.log('Connected: ' + frame);
stompClient.subscribe('/topic/share', function(message) {
console.log("Subscription successful............");
console.log(JSON.parse(message.body));
});
stompClient.subscribe('/user/queue/reply', function(message) {
console.log("Subscribed to user ...");
});
}, function (error) {
console.log("Stomp protocol error: "+ error);
});
}
$('#livetext').click(function () {
stompClient.send('/app/stockfeeds', {}, 'This is BS');
});
Run Code Online (Sandbox Code Playgroud)
当我点击“#livetext”时,发送功能就会起作用。并且在客户端收到响应。下面是我的控制器。
@Controller
public class StockDispatchController {
@Autowired
LoggerService loggerService;
@MessageMapping("/stockfeeds")
@SendTo("/topic/share") …Run Code Online (Sandbox Code Playgroud) 我有非常简单的 SpringBoot 项目,具有简单的配置和简单的集成测试来测试 WebSockets。
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>sandbox.websocket</groupId>
<artifactId>websocket</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.2.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<properties>
<java.version>1.8</java.version>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Run Code Online (Sandbox Code Playgroud)
SpringBoot应用:
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
Run Code Online (Sandbox Code Playgroud)
消息代理配置
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/topic");
}
@Override
public …Run Code Online (Sandbox Code Playgroud) 我必须在 Spring Boot 应用程序中添加对自定义 WebSocket 子协议(因此不是 STOMP)的支持,但我很难理解我需要提供什么以及 Spring 已经拥有什么。
这就是我走了多远:
@Configuration
@EnableWebSocket
public class WebSocketAutoConfiguration implements WebSocketConfigurer {
public void registerWebSocketHandlers(WebSocketHandlerRegistry webSocketHandlerRegistry) {
webSocketHandlerRegistry.addHandler(this.webSocketHandler(), new String[]{endpointUrl});
}
@Bean
public WebSocketHandler webSocketHandler() {
ExecutorSubscribableChannel clientInboundChannel = new ExecutorSubscribableChannel();
ExecutorSubscribableChannel clientOutboundChannel = new ExecutorSubscribableChannel();
SubProtocolWebSocketHandler subProtocolWebSocketHandler = new SubProtocolWebSocketHandler(clientInboundChannel, clientOutboundChannel);
subProtocolWebSocketHandler.addProtocolHandler(new SubProtocolHandler() {
public List<String> getSupportedProtocols() {
return Collections.singletonList("custom-protocol");
}
public void handleMessageFromClient(WebSocketSession session, WebSocketMessage<?> message, MessageChannel outputChannel) throws Exception {
session.sendMessage(new TextMessage("some message"));
}
public void handleMessageToClient(WebSocketSession session, Message<?> message) throws …Run Code Online (Sandbox Code Playgroud) 我正在运行Spring Boot@2.2.x带有公开 WebSocket 端点的服务器。这是我的WebSocketConfiguration:
@Slf4j
@Configuration
public class WebSocketConfiguration {
private static final String WS_PATH = "/ws/notifications";
@Bean
public HandlerMapping webSocketHandlerMapping() {
Map<String, WebSocketHandler> handlersMap = new HashMap<>();
handlersMap.put(WS_PATH, session -> session.send(session.receive()
.map(WebSocketMessage::getPayloadAsText)
.doOnEach(logNext(log::info))
.map(msg -> format("notification for your msg: %s", msg))
.map(session::textMessage)));
SimpleUrlHandlerMapping handlerMapping = new SimpleUrlHandlerMapping();
handlerMapping.setOrder(Ordered.HIGHEST_PRECEDENCE);
handlerMapping.setUrlMap(handlersMap);
return handlerMapping;
}
@Bean
public WebSocketHandlerAdapter handlerAdapter(WebSocketService webSocketService) {
return new WebSocketHandlerAdapter(webSocketService);
}
@Bean
public WebSocketService webSocketService() {
return new HandshakeWebSocketService(new ReactorNettyRequestUpgradeStrategy());
}
}
Run Code Online (Sandbox Code Playgroud)
问题是如何使用Basic …
websocket spring-boot spring-websocket java-websocket spring-webflux
我们一直在评估 Spring-Stomp-Broker-websockets,以使其成为在 AWS 上运行的全双工类型消息应用程序。我们原本希望使用 Amazon MQ。我们向个人用户推送消息,也进行广播。所以从功能上来说,这个堆栈看起来确实不错。我们有大约 40,000 - 80,000 名用户。通过负载测试,我们很快发现 Spring 堆栈或 Amazon MQ 都无法很好地扩展,存在以下问题:
\n\n是的,我们增加了机器上的文件句柄等,因此 TCP 连接不是限制。Spring 不可能接近这里的限制。我们正在发送一条 18 K 的消息,用于负载,这是我们期望的最大值。在我们的结果中,消息大小几乎没有影响,它只是 Spring Stack 上的连接开销。
\n\nStompBrokerRelayMessageHandler 为每个 STOMP Connect 打开与 Broker 的连接。没有办法集中连接。因此,这使得这个 Spring 功能对于任何 \xe2\x80\x98real\xe2\x80\x99 Web 应用程序完全无用。为了支持我们的用户,用于 MQ 的 AWS 大型服务器的成本意味着该解决方案非常昂贵,需要 40 台最大的服务器。在负载测试中,Amazon MQ 机器不执行任何操作,对于 1000 个用户,它没有加载。实际上,我们所有代理只需要几台中型机器即可。
\n\n有没有人曾经使用 …
我有一个使用 web-sockets 和 stomp 的 Spring Boot 应用程序,由于我们的 ISAM 设置的限制,我必须使用该xhr-polling协议,并且该应用程序将托管在Pivotal Cloud Foundry (PCF).
当我运行单个实例时,使用以下代码(如下)一切正常。
服务器
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/dummy");
registry.setApplicationDestinationPrefixes("/app");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry stompEndpointRegistry) {
stompEndpointRegistry
.addEndpoint("/dummyendpoint")
.setAllowedOrigins("*")
.withSockJS();
}
}
Run Code Online (Sandbox Code Playgroud)
客户
var socket,client;
socket = new SockJS('http://localhost:8080/dummyendpoint');
client = Stomp.over(socket);
client.connect({}, function () {
client.subscribe('/dummy/message', function (message) {
console.log('subscribed');
}
});
Run Code Online (Sandbox Code Playgroud)
但是,如果我扩展到 2 个实例,web-socket连接就会开始失败:
GET localhost:8080/dummyendpoint/info -> Status 200
POST …Run Code Online (Sandbox Code Playgroud) 在 Reactor 3.4.0 中,不同的 FluxProcessor(例如“DirectProcessor”)已被弃用。我使用这样的处理器作为订阅者,请参见下面的示例。
现在我想知道如何迁移我的代码才能使用推荐的Sinks.many()方法?有任何想法吗?
旧代码:
DirectProcessor<String> output = DirectProcessor.create();
output.subscribe(msg -> System.out.println(msg));
WebSocketClient client = new ReactorNettyWebSocketClient();
client.execute(uri, session ->
// send message
session.send(Mono.just(session.textMessage(command)))
.thenMany(session.receive()
.map(message -> message.getPayloadAsText())
.subscribeWith(output))
.then()).block();
Run Code Online (Sandbox Code Playgroud)
根据已弃用的 DirectProcessor 的 JavaDoc,我应该使用Sinks.many().multicast().directBestEffort(). 但我想知道如何在我的 WebSocketClient 中使用它?
迁移代码:
Many<String> sink = Sinks.many().multicast().directBestEffort();
Flux<String> flux = sink.asFlux();
flux.subscribe(msg -> System.out.println(msg));
WebSocketClient client = new ReactorNettyWebSocketClient();
client.execute(uri, session ->
// send message
session.send(Mono.just(session.textMessage(command)))
.thenMany(session.receive()
.map(message -> message.getPayloadAsText())
.subscribe ... // <-- how to do …Run Code Online (Sandbox Code Playgroud) 现在,我在我的 SpringBoot 应用程序中实现了一个简单的 Kafka Consumer 和 Producer,它工作得很好,接下来我想做的是,我的消费者获取消费的消息并将其直接广播给所有订阅的客户端。我发现我不能将 STOMP 消息传递与 WebFlux 一起使用,那么我该如何完成这个任务,我看到了反应式 WebSocket 实现,但我不知道如何将我使用的数据发送到我的 websocket。
这是我的简单 KafkaProducer:
fun addMessage(message: Message){
val headers : MutableMap<String, Any> = HashMap()
headers[KafkaHeaders.TOPIC] = topicName
kafkaTemplate.send(GenericMessage<Message>(message, headers))
}
Run Code Online (Sandbox Code Playgroud)
我的简单消费者看起来像这样:
@KafkaListener(topics = ["mytopic"], groupId = "test-consumer-group")
fun receiveData(message:Message) :Message{
//Take consumed data and send to websocket
}
Run Code Online (Sandbox Code Playgroud) spring-websocket ×10
spring ×4
websocket ×4
java ×3
spring-boot ×2
stomp ×2
apache-kafka ×1
long-polling ×1
socket.io ×1
spring-cloud ×1