Websocket 使用队列等待服务器响应

Hyp*_*olf 2 javascript websocket promise async-await

我使用 websocket 发送和接收数据(每秒最多 30 条小消息)。我希望客户端发送 websocket 有效负载并等待来自服务器的特定消息。

流动:

客户端发送请求

它还将 requestId (163)作为带有时间戳waitingResponse的新对象存储在对象中sent

waitingResponse = {
  163: { sent: 1583253453549 }
}
Run Code Online (Sandbox Code Playgroud)

当服务器响应时,另一个函数会验证有效负载,然后将结果附加到该请求对象

waitingResponse = {
  163: { sent: 1583253453549, action: "none" }
}
Run Code Online (Sandbox Code Playgroud)

客户端每隔 x 毫秒检查该对象的action密钥

我有一个函数发送有效负载,然后等待来自(下面的函数)sendPayload的值。awaitResponse目前这个功能还不能用。我尝试制作两个单独的函数,一个是 setTimeout 计时器,另一个是 Promise。我还尝试将两者放在同一个函数中,并确定它是循环还是带有参数的承诺,original您可以在下面看到。现在我认为即使在循环中,函数也应该始终返回一个承诺,但我似乎无法使用计时器来实现这一点,而且我担心彼此之间存在多个承诺的成本。假设我每 5 毫秒检查一次响应,超时时间为 2000 毫秒。这是很多承诺。

public async sendPayload(details) {
    console.log("sendPlayload", details);

    this.waitingResponse[details.requestId] = { sent: +new Date() };

    if (this.socket.readyState === WebSocket.OPEN) {
        this.socket.send(JSON.stringify(details));
    }
    const bindAwaitResponse = this.awaitResponse.bind(this);
    return new Promise(async function (resolve, reject) {
        const result = await bindAwaitResponse(details.requestId, true);
        console.log("RES", result);
        console.info("Time took", (+new Date() - result.sent) / 1000);

        resolve(result);
    });

}

public async awaitResponse(requestId, original) {
    // console.log(requestId, "awaitResponse")
    return new Promise((resolve, reject) => {
        // Is it a valid queued request
        if (this.waitingResponse[requestId]) {
            // Do we have an answer?
            if (this.waitingResponse[requestId].hasOwnProperty("action")) {
                console.log(requestId, "Got a response");
                const tmp = this.waitingResponse[requestId];
                delete this.waitingResponse[requestId]; // Cleanup
                resolve(tmp);
            } else {
                // No answer yet from remote server
                // console.log("no answer: ", JSON.stringify(this.waitingResponse));
                // Check if request took too long
                if (+new Date() - this.waitingResponse[requestId].sent > 5000) { // TODO: Option for time out
                    console.warn(requestId, "Request timed out");

                    // Timed out, result took too long
                    // TODO: Option, default action when timed out
                    delete this.waitingResponse[requestId];  // Cleanup
                    resolve({
                        action: "to" // For now, just sent a timeout action, maybe the default action should be outside of the Network class?
                    })
                } else {
                    // console.log(requestId, "Still waiting for results");
                    console.log(JSON.stringify(this.waitingResponse));
                    // Still waiting, after x ms, recall function
                    return setTimeout(async () => { resolve(await this.awaitResponse(requestId, false)); }, 200);
                }
            }
        }
    });
}

private async processMessage(msg) {
    console.log("WS received Message", JSON.stringify(msg.data));

    console.log("Current: ", JSON.stringify(this.waitingResponse));

    let data = JSON.parse(msg.data);
    // console.log("Received: ", data);


    if (data.hasOwnProperty("requestId") && this.waitingResponse[data.requestId]) {
        // console.log("processMessage ID found");
        this.waitingResponse[data.requestId] = { ...data, ...this.waitingResponse[data.requestId] };

    }
}
Run Code Online (Sandbox Code Playgroud)

注意:我把websocket标签放在下面是因为我很努力地寻找它。也许我在没有意识到的情况下找到了解决方案,但是如果您有更好的标签可以更轻松地找到这个问题,请编辑它们:)

Ber*_*rgi 6

是的,您将许多回调风格的函数与中间的 Promise 和async/混合在一起await。等待响应时不要进行轮询!相反,在编写队列时,将resolve函数本身放入队列中,以便您可以直接履行/拒绝来自响应处理程序的相应承诺。

在你的情况下:

public async sendPayload(details) {
    const request = this.waitingResponse[details.requestId] = { sent: +new Date() };
    try {
        if (this.socket.readyState === WebSocket.OPEN) {
           this.socket.send(JSON.stringify(details));
        }
        const result = await new Promise(function(resolve) {
            request.resolve = resolve;

            setTimeout(() => {
                reject(new Error('Timeout')); // or resolve({action: "to"}), or whatever
            }, 5000);
        });
        console.info("Time took", (+new Date() - request.sent) / 1000);
        return result; // or {...request, ...result} if you care
    } finally {
        delete this.waitingResponse[details.requestId];
    }
}


private async processMessage(msg) {
    let data = JSON.parse(msg.data);

    if (data.hasOwnProperty("requestId") {
        const request = this.waitingResponse[data.requestId]
        if (request)
            request.resolve(data)
        else
            console.warn("Got data but found no associated request, already timed out?", data)
    } else {
        console.warn("Got data without request id", data);
    }
}
Run Code Online (Sandbox Code Playgroud)

如果函数不需要有关请求的任何详细信息,您甚至可以完全删除该request对象,只存储resolve函数本身。processMessage