Ben*_*une 7 javascript tcp node.js promise ecmascript-6
我有一个这样的课:
import net from 'net';
import {EventEmitter} from 'events';
import Promise from 'bluebird';
class MyClass extends EventEmitter {
constructor(host = 'localhost', port = 10011) {
super(EventEmitter);
this.host = host;
this.port = port;
this.socket = null;
this.connect();
}
connect() {
this.socket = net.connect(this.port, this.host);
this.socket.on('connect', this.handle.bind(this));
}
handle(data) {
this.socket.on('data', data => {
});
}
send(data) {
this.socket.write(data);
}
}
Run Code Online (Sandbox Code Playgroud)
如何将send方法转换为promise,从socket的data事件中返回一个值?除了可以轻松抑制的连接消息之外,服务器仅在向其发送数据时发回数据.
我尝试过类似的东西:
handle(data) {
this.socket.on('data', data => {
return this.socket.resolve(data);
});
this.socket.on('error', this.socket.reject.bind(this));
}
send(data) {
return new Promise((resolve, reject) => {
this.socket.resolve = resolve;
this.socket.reject = reject;
this.socket.write(data);
});
}
Run Code Online (Sandbox Code Playgroud)
显然这不起作用,因为resolve/ reject当链接和/或send多次并行调用时,/ 将覆盖彼此.
还send存在并行调用两次的问题,它可以解决先回复的响应.
我目前有一个使用队列和延期的实现,但由于队列不断被检查,它感觉很乱.
我希望能够做到以下几点:
let c = new MyClass('localhost', 10011);
c.send('foo').then(response => {
return c.send('bar', response.param);
//`response` should be the data returned from `this.socket.on('data')`.
}).then(response => {
console.log(response);
}).catch(error => console.log(error));
Run Code Online (Sandbox Code Playgroud)
只是要添加,我无法控制收到的数据,这意味着它无法在流之外进行修改.
编辑:所以看起来这是非常不可能的,因为TCP没有请求 - 响应流.如何使用promises实现它,但是使用单次执行(一次一个请求)promise链或队列.
我将问题提炼到最低限度,并使其可以在浏览器中运行:
EventEmitter。该解决方案的工作原理是将新请求附加到承诺链,但在任何给定时间点最多允许一个打开/未答复的请求。.send每次调用时都会返回一个新的 Promise,并且该类负责所有内部同步。因此,.send可以多次调用并保证请求处理的正确顺序(先进先出)。我添加的一项附加功能是,如果没有待处理的请求,则修剪承诺链。
警告我完全省略了错误处理,但无论如何应该根据您的特定用例进行定制。
class SocketMock {
constructor(){
this.connected = new Promise( (resolve, reject) => setTimeout(resolve,200) );
this.listeners = {
// 'error' : [],
'data' : []
}
}
send(data){
console.log(`SENDING DATA: ${data}`);
var response = `SERVER RESPONSE TO: ${data}`;
setTimeout( () => this.listeners['data'].forEach(cb => cb(response)),
Math.random()*2000 + 250);
}
on(event, callback){
this.listeners[event].push(callback);
}
}
class SingleRequestCoordinator {
constructor() {
this._openRequests = 0;
this.socket = new SocketMock();
this._promiseChain = this.socket
.connected.then( () => console.log('SOCKET CONNECTED'));
this.socket.on('data', (data) => {
this._openRequests -= 1;
console.log(this._openRequests);
if(this._openRequests === 0){
console.log('NO PENDING REQUEST --- trimming the chain');
this._promiseChain = this.socket.connected
}
this._deferred.resolve(data);
});
}
send(data) {
this._openRequests += 1;
this._promiseChain = this._promiseChain
.then(() => {
this._deferred = Promise.defer();
this.socket.send(data);
return this._deferred.promise;
});
return this._promiseChain;
}
}
var sender = new SingleRequestCoordinator();
sender.send('data-1').then(data => console.log(`GOT DATA FROM SERVER --- ${data}`));
sender.send('data-2').then(data => console.log(`GOT DATA FROM SERVER --- ${data}`));
sender.send('data-3').then(data => console.log(`GOT DATA FROM SERVER --- ${data}`));
setTimeout(() => sender.send('data-4')
.then(data => console.log(`GOT DATA FROM SERVER --- ${data}`)), 10000);
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
456 次 |
| 最近记录: |