Python 客户端不接收来自 Spring websocket 服务器的消息

sam*_*777 5 python java spring websocket kotlin

该项目的目标是让 JVM(Kotlin)服务器和 Python 客户端通过 websocket 相互通信。服务器必须向客户端发送更新,客户端将处理这些更新。

服务器是一个 Spring Boot 应用程序,运行 Spring Boot websocket 服务器。客户端现在只是一个 Python 脚本,运行 websocket-client 来连接到服务器。

什么工作:

  • 项目清单
  • 客户端可以连接到服务器
  • 客户端可以连接到服务器上的主题(使用 STOMP)
  • 客户端可以向服务器发送消息,服务器接收并处理该消息
  • 服务器可以在 websocket 上发送消息

什么不起作用:

  • 客户端没有收到来自服务器的消息

我已经通过连接到 websocket.org (ws://echo.websocket.org) 的回显服务器尝试了客户端的接收部分,并且客户端从服务器接收回显消息。所以在我看来,问题不在客户端。


代码时间。

Kotlin 服务器: 用于创建 websocket 服务器的代码:

package nl.sajansen.screenchangenotifierserver

import org.springframework.beans.factory.annotation.Autowired
import org.springframework.context.annotation.Configuration
import org.springframework.messaging.handler.annotation.MessageMapping
import org.springframework.messaging.simp.SimpMessagingTemplate
import org.springframework.messaging.simp.config.MessageBrokerRegistry
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Controller
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker
import org.springframework.web.socket.config.annotation.StompEndpointRegistry
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer
import java.time.Instant
import java.time.format.DateTimeFormatter


@Configuration
@EnableWebSocketMessageBroker
class WebSocketConfig : WebSocketMessageBrokerConfigurer {
    override fun registerStompEndpoints(registry: StompEndpointRegistry) {
        registry.addEndpoint("/ws").setAllowedOrigins("*")
    }

    override fun configureMessageBroker(config: MessageBrokerRegistry) {
        config.enableSimpleBroker("/topic", "/queue")
        config.setApplicationDestinationPrefixes("/app")
    }
}

@Controller
class WebsocketController @Autowired constructor(val template: SimpMessagingTemplate) {
    @Scheduled(fixedDelayString = "1000")
    fun blastToClientsHostReport() {
        println("Sending something on the websocket")
        template.convertAndSend("/topic/greeting", "Hello World");
    }

    @MessageMapping("/greeting")
    fun handle(message: String): String {
        println("Received message: $message")
        template.convertAndSend("/topic/greeting", message)
        return "[" + getTimestamp() + ": " + message
    }
}

fun getTimestamp(): String = DateTimeFormatter.ISO_INSTANT.format(Instant.now())

Run Code Online (Sandbox Code Playgroud)

Gradle 依赖项等:

import org.jetbrains.kotlin.gradle.tasks.KotlinCompile

plugins {
    id("org.springframework.boot") version "2.1.9.RELEASE"
    id("io.spring.dependency-management") version "1.0.8.RELEASE"
    kotlin("jvm") version "1.2.71"
    kotlin("plugin.spring") version "1.2.71"
    kotlin("plugin.jpa") version "1.2.71"
    kotlin("plugin.allopen") version "1.2.71"       // For JPA lazy fetching
}

allOpen {
    annotation("javax.persistence.Entity")
    annotation("javax.persistence.Embeddable")
    annotation("javax.persistence.MappedSuperclass")
}

repositories {
    mavenCentral()
}

dependencies {
    implementation("org.springframework.boot:spring-boot-starter-data-jpa")
    runtimeOnly("com.h2database:h2:1.4.197") // Fixed version as a workaround for https://github.com/h2database/h2database/issues/1841
    implementation("org.springframework.boot:spring-boot-starter-web")
    implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
    implementation("org.jetbrains.kotlin:kotlin-reflect")
    implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
    testImplementation("org.springframework.boot:spring-boot-starter-test") {
        exclude(module = "junit")
        exclude(module = "mockito-core")
    }
    testImplementation("org.junit.jupiter:junit-jupiter-api")
    testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine")
    testImplementation("com.ninja-squad:springmockk:1.1.2")
    compile("org.springframework.boot:spring-boot-starter-websocket")
}

tasks.withType<KotlinCompile> {
    kotlinOptions {
        freeCompilerArgs = listOf("-Xjsr305=strict")
        jvmTarget = "1.8"
    }
}

tasks.withType<Test> {
    useJUnitPlatform()
}
Run Code Online (Sandbox Code Playgroud)

Python客户端:

import random

import stomper as stomper
import websocket


def main():
  websocket.enableTrace(True)

  # Connecting to websocket
  ws = websocket.create_connection("ws://localhost:8080/ws")

  # Subscribing to topic
  client_id = str(random.randint(0, 1000))
  sub = stomper.subscribe("/topic/greeting", client_id, ack='auto')
  ws.send(sub)

  # Sending some message
  ws.send(stomper.send("/app/greeting", "Hello there"))

  while True:
    print("Receiving data: ")
    d = ws.recv()
    print(d)


if __name__ == '__main__':
  main()

Run Code Online (Sandbox Code Playgroud)

点依赖:

opencv-python==4.1.1.26
websocket-client==0.56.0
stomper==0.4.3
Run Code Online (Sandbox Code Playgroud)

控制台输出

现在,服务器的控制台输出是这样的。可以看到,当客户端未连接时,没有订阅者发送预定消息。然后客户端连接成功,预定的消息被广播给 1 个订阅者。

opencv-python==4.1.1.26
websocket-client==0.56.0
stomper==0.4.3
Run Code Online (Sandbox Code Playgroud)

客户端的输出是这样的,但只是等到世界末日才收到消息:

Sending something on the websocket
2019-10-17 12:45:09.425 DEBUG 32285 --- [MessageBroker-3] org.springframework.web.SimpLogging      : Processing MESSAGE destination=/topic/greeting session=null payload=Hello World
Sending something on the websocket
2019-10-17 12:45:10.426 DEBUG 32285 --- [MessageBroker-3] org.springframework.web.SimpLogging      : Processing MESSAGE destination=/topic/greeting session=null payload=Hello World
2019-10-17 12:45:10.849  INFO 32285 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
2019-10-17 12:45:10.850  INFO 32285 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2019-10-17 12:45:10.850 DEBUG 32285 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Detected StandardServletMultipartResolver
2019-10-17 12:45:10.855 DEBUG 32285 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : enableLoggingRequestDetails='true': request parameters and headers will be shown which may lead to unsafe logging of potentially sensitive data
2019-10-17 12:45:10.855  INFO 32285 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 5 ms
2019-10-17 12:45:10.861 DEBUG 32285 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : GET "/ws", parameters={}
2019-10-17 12:45:10.865 DEBUG 32285 --- [nio-8080-exec-1] o.s.w.s.s.s.WebSocketHandlerMapping      : Mapped to org.springframework.web.socket.server.support.WebSocketHttpRequestHandler@27a9f025
2019-10-17 12:45:10.872 DEBUG 32285 --- [nio-8080-exec-1] o.s.w.s.s.s.WebSocketHttpRequestHandler  : GET /ws
2019-10-17 12:45:10.885 DEBUG 32285 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Completed 101 SWITCHING_PROTOCOLS
2019-10-17 12:45:10.901 DEBUG 32285 --- [nio-8080-exec-1] s.w.s.h.LoggingWebSocketHandlerDecorator : New StandardWebSocketSession[id=393fc3cd-9ca3-1749-1ea8-541def6592e0, uri=ws://localhost:8080/ws]
2019-10-17 12:45:10.912 DEBUG 32285 --- [nboundChannel-2] org.springframework.web.SimpLogging      : Processing SUBSCRIBE /topic/greeting id=216 session=393fc3cd-9ca3-1749-1ea8-541def6592e0
2019-10-17 12:45:10.914 DEBUG 32285 --- [nboundChannel-7] .WebSocketAnnotationMethodMessageHandler : Searching methods to handle SEND /app/greeting session=393fc3cd-9ca3-1749-1ea8-541def6592e0 text/plain payload=Hello there, lookupDestination='/greeting'
2019-10-17 12:45:10.915 DEBUG 32285 --- [nboundChannel-7] .WebSocketAnnotationMethodMessageHandler : Invoking nl.sajansen.screenchangenotifierserver.WebsocketController#handle[1 args]
Received message: Hello there
2019-10-17 12:45:10.916 DEBUG 32285 --- [nboundChannel-7] org.springframework.web.SimpLogging      : Processing MESSAGE destination=/topic/greeting session=null payload=Hello there
2019-10-17 12:45:10.916 DEBUG 32285 --- [nboundChannel-7] org.springframework.web.SimpLogging      : Broadcasting to 1 sessions.
2019-10-17 12:45:10.919 DEBUG 32285 --- [nboundChannel-7] org.springframework.web.SimpLogging      : Processing MESSAGE destination=/topic/greeting session=393fc3cd-9ca3-1749-1ea8-541def6592e0 payload=[2019-10-17T10:45:10.917Z: Hello there
2019-10-17 12:45:10.919 DEBUG 32285 --- [nboundChannel-7] org.springframework.web.SimpLogging      : Broadcasting to 1 sessions.
Sending something on the websocket
2019-10-17 12:45:11.427 DEBUG 32285 --- [MessageBroker-3] org.springframework.web.SimpLogging      : Processing MESSAGE destination=/topic/greeting session=null payload=Hello World

Run Code Online (Sandbox Code Playgroud)

如果您有更好的想法,请提供更多背景信息:更新将包含一个图像文件(可能编码为 base64)。更新必须接近实时发送(允许延迟不超过 1 秒)。这些更新的间隔可以从几分钟到半秒不等。客户端和服务器是同一网络中的两台不同机器,但该网络的吞吐量有限。


那么,谁能发现哪里出了问题?

我已经阅读了本文档中关于 websockets 的很大一部分,但我看不出哪里出了问题:https : //docs.spring.io/spring/docs/current/spring-framework-reference/web.html#网络套接字

这个答案也让我走得很远,但是这个例子本身并没有开始工作(在更正while not Trueto 之后while True)。

2019 年 10 月 18 日更新:我寻找了一个 Python SocketJS 客户端,因为我让 Kotlin 服务器部分与 SocketJS 和 JavaScript 客户端一起工作。但是我找不到任何 Python SocketJS 客户端实现.. 我想知道剩下的唯一解决方案是否是在 Python 应用程序(客户端)中启动 websocket 服务器并让客户端将它的 websocket 服务器详细信息发送到 Kotlin 服务器,然后将连接到客户端的 websocket 服务器。这不是一个很好的解决方案,但我想知道它是否有效。我会及时通知你的。

2021 年 2 月 1 日更新:我没有再花时间解决这个问题。但是我会放这篇文章以防万一有人解决这个问题并且可以帮助其他人解决这个问题。

小智 1

上个月我遇到了同样的问题。我假设您正在使用 Python 的 Stomp over Web Socket。参考Websocket客户端未收到任何消息,我认为您忘记启动连接。

因此,您应该从此更改您的 python 客户端代码

...
 # Connecting to websocket
  ws = websocket.create_connection("ws://localhost:8080/ws")

  # Subscribing to topic
  client_id = str(random.randint(0, 1000))
  sub = stomper.subscribe("/topic/greeting", client_id, ack='auto')
  ws.send(sub)
...
Run Code Online (Sandbox Code Playgroud)

进入这个

 # Connecting to websocket
 ws = websocket.create_connection("ws://localhost:8080/ws")
  
 # Initate Stomp connection!
 ws.send("CONNECT\naccept-version:1.0,1.1,2.0\n\n\x00\n")

  # Subscribing to topic
  client_id = str(random.randint(0, 1000))
  sub = stomper.subscribe("/topic/greeting", client_id, ack='auto')
  ws.send(sub)
Run Code Online (Sandbox Code Playgroud)

我希望它有效。

尽管如此,我通过WebSocketAppimport websocket库中实现类来解决这个问题,而不是像代码那样执行程序步骤。