带有node.js管道接收器的ZeroMQ会在一段时间后停止接收消息

Flo*_*oby 9 javascript pipeline zeromq node.js

我一直在尝试设置呼吸机/工作人员/接收器模式以便抓取页面,但我从未经历过测试阶段.我的设置的一个特点是水槽与呼吸机处于同一过程中.所有节点都使用ipc:// transport.目前只交换测试消息.呼吸机发送任务,工作人员接收它们并等待然后向水槽发送确认.

症状:在一段时间(通常少于5分钟)后,即使呼吸机继续发送任务并且工作人员继续接收它们并发送确认消息,接收器也会停止接收确认消息.

我知道确认已发送,因为如果我重新启动我的接收器,它会在启动时获取所有丢失的消息.

我以为ZeroMQ处理了自动重新连接.

通风机/宿

var push = zmq.socket('push');
var sink = zmq.socket('pull');
var pi = 0;
setInterval(function() {
    push.send(['ping', pi++], zmq.ZMQ_SNDMORE);
    push.send('end');
}, 2000);
push.bind('ipc://crawl.ipc');
sink.bind('ipc://crawl-sink.ipc');
sink.on('message', function() {
    var args = [].slice.apply(arguments).map(function(e) {return e.toString()});
    console.log('got message', args.join(' '));
});
Run Code Online (Sandbox Code Playgroud)

worker.js

var pull = zmq.socket('pull');
var sink = zmq.socket('push');
sink.connect(opt.sink);
pull.connect(opt.push);

pull.on('message', function() {
    var args = [].slice.apply(arguments).map(function(e) {return e.toString()});
    console.log('got job ', args.join(' '));
    setTimeout(function() {
        console.log('job done ', args.join(' '));
        sink.send(['job done', args.join(' ')]);
    }, Math.random() * 5 * 1000);
});
Run Code Online (Sandbox Code Playgroud)

编辑我尝试将接收器移动到另一个进程,它似乎工作.但是我真的希望它生活在同一个进程中,并且我在每个进程处理多个zmq套接字时观察到类似的行为,无论使用何种模式

编辑我正在使用这个模块https://github.com/JustinTulloss/zeromq.node

Jac*_*ter 4

我不一定希望这个答案被接受,但我将其放在这里以供参考。有一个非常忠实的纯节点模块,称为Axon,它受到 ZeroMQ 的启发。

  • Axon 没有编译的依赖项,并重新创建与 ZeroMQ 相同的套接字类型。
  • Axon 还基于 pub/sub 套接字类型构建来创建网络事件发射器。
  • 最后,ZMQ 的 req/rep 套接字不适用于 Node.js,因为 ZMQ 期望回复同步发生。作为原生 Node,Axon 库可以正确处理 req/rep 模式。

注意: ZMQ 和 Axon 不能互操作。