使用 XMLHttpRequest 处理内存高效的消息块

Roe*_*den 5 javascript xmlhttprequest stream node.js

我有一个XMLHttpRequestprogress被请求分块网页,其中不断地发送消息,添加了大块事件处理程序。如果我没有设置 a responseType,我可以在每个事件中访问 的response属性并处理额外的消息块。这种方法的问题是浏览器必须将整个响应保存在内存中,最终浏览器会因为这种内存浪费而崩溃。XMLHttpRequestprogress

所以,我尝试了一个responseTypearraybuffer希望可以对缓冲区进行切片,防止之前过多的内存浪费。不幸的是,此时progress事件处理程序不再能够读取 的response属性XMLHttpRequest。事件的事件参数progress也不包含缓冲区。这是我在这方面的尝试的一个简短的、独立的示例(这是为 编写的node.js):

var http = require('http');

// -- The server.

http.createServer(function(req, res) {
  if (req.url === '/stream') return serverStream(res);
  serverMain(res);
}).listen(3000);

// -- The server functions to send a HTML page with the client code, or a stream.

function serverMain(res) {
  res.writeHead(200, {'Content-Type': 'text/html'});
  res.write('<html><body>Hello World</body><script>');
  res.end(client.toString() + ';client();</script></html>');
}

function serverStream(res) {
  res.writeHead(200, {'Content-Type': 'text/html'});
  setInterval(function() {
    res.write('Hello World<br />\n');
  }, 1000);
}

// -- The client code which runs in the browser.

function client() {
  var xhr = new XMLHttpRequest();
  xhr.addEventListener('progress', function() {
    if (!xhr.response) return console.log('progress without response :-(');
    console.log('progress: ' + xhr.response.size);
  }, false);
  xhr.open('GET', '/stream', true);
  xhr.responseType = 'arraybuffer';
  xhr.send();
}
Run Code Online (Sandbox Code Playgroud)

progress事件处理程序具有对进不去response想我。如何以节省内存的方式处理浏览器中的消息块?请不要建议一个WebSocket. 我不希望仅使用一个来处理消息块的只读流。

spe*_*bus 4

XMLHttpRequest似乎并不是真正为这种用途而设计的。显而易见的解决方案是轮询,这是一种流行的用法XMLHttpRequest,但我猜您不想错过流中会在调用之间滑动的数据。

对于我的问题Can the "real" data chunks be identified in some way or is it basically random data ?,你回答了With some effort, the chunks could be identified by adding an event-id of sorts to the server-side

基于这个前提,我建议:

想法:合作并发侦听器

  1. 连接到流并设置进度侦听器(称为listenerA())。
  2. 当一个块到达时,对其进行处理并输出。保留对 收到的第一个和最后一个块的 id 的引用listenerA()listenerA()计算已收到多少块。
  3. listenerA()收到一定数量的块后,生成另一个“线程”(连接 + 侦听器listenerB()),与第一个线程并行执行步骤 1 和 2,但将处理后的数据保留在缓冲区中而不是输出。
  4. listenerA()接收到与 收到的第一个块具有相同 id 的块时listenerB(),向 发送信号listenerB(),断开第一个连接并终止listenerA()
  5. listenerB()收到来自 的终止信号时listenerA(),将缓冲区转储到输出并继续正常处理。
  6. listenerB()在与以前相同的listenerC()条件下产卵。
  7. 根据需要不断重复尽可能多的连接和侦听器。

通过使用两个重叠的连接,您可以防止由于删除单个连接然后重新连接而可能导致的块丢失。

笔记

  • 这假设所有连接的数据流都是相同的,并且不会引入一些个性化设置。
  • 根据流的输出速率和连接延迟,从一个连接转换到另一个连接期间的缓冲区转储可能会很明显。
  • 您还可以测量总响应大小而不是块计数来决定何时切换到新连接。
  • 可能有必要保留一个完整的块 id 列表来进行比较,而不仅仅是第一个和最后一个,因为我们无法保证重叠的时间。
  • of必须设置为其默认值responseType或“text”,才能返回文本。其他数据类型不会返回部分. 请参阅https://xhr.spec.whatwg.org/#the-response-attributeXMLHttpRequest""response

在node.js中测试服务器

以下代码是一个 Node.js 服务器,它输出一致的元素流以用于测试目的。您可以打开多个连接,跨会话的输出将是相同的,减去可能的服务器延迟。

http://localhost:5500/stream

将返回数据,其中 id 是递增的数字

http://localhost:5500/streamRandom

将返回数据,其中 id 是随机的 40 个字符长的字符串。这是为了测试不能依赖 id 来排序数据的场景。

var crypto = require('crypto');

// init + update nodeId
var nodeId     = 0;
var nodeIdRand = '0000000000000000000000000000000000000000';

setInterval(function() {

    // regular id
    ++nodeId;

    //random id
    nodeIdRand = crypto.createHash('sha1').update(nodeId.toString()).digest('hex');
}, 1000);


// create server  (port 5500)
var http = require('http');
http.createServer(function(req, res) {

  if(req.url === '/stream') {
      return serverStream(res);
  }
  else if(req.url === '/streamRandom') {
      return serverStream(res, true);
  }
}).listen(5500);


// serve nodeId
function serverStream(res, rand) {

    // headers
    res.writeHead(200, {
        'Content-Type'                : 'text/plain',
        'Access-Control-Allow-Origin' : '*',
    });

    // remember last served id
    var last = null;

    // output interval
    setInterval(function() {

        // output on new node
        if(last != nodeId) {
            res.write('[node id="'+(rand ? nodeIdRand : nodeId)+'"]');
            last = nodeId;
        }
    }, 250);
}
Run Code Online (Sandbox Code Playgroud)

使用上述 Node.js 服务器代码进行概念验证

<!DOCTYPE html>
<html>
    <head>
        <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
    </head>
    <body>
        <button id="stop">stop</button>
        <div id="output"></div>
        <script>

/*
Listening to a never ending page load (http stream) without running out of
memory by using concurrent overlapping connections to prevent loss of data,
using only xmlHttpRequest, under the condition that the data can be identified.

listen arguments
    url         url of the http stream
    chunkMax    number of chunks to receive before switching to new connection

listen properties
    output           a reference to a DOM element with id "output"
    queue            an array filled with non-duplicate received chunks and metadata
    lastFetcherId    an incrementing number used to assign an id to new fetchers
    fetchers         an array listing all active fetchers

listen methods
    fire        internal use    fire an event
    stop        external use    stop all connections
    fetch       internal use    starts a new connection
    fetchRun    internal use    initialize a new fetcher object

Usage

    var myListen = new listen('http://localhost:5500/streamRandom', 20);
        will listen to url "http://localhost:5500/streamRandom"
        will switch connections every 20 chunks

    myListen.stop()
        will stop all connections in myListen
*/
function listen(url, chunkMax) {

    // main ref
    var that = this;

    // output element
    that.output = document.getElementById('output');

    // main queue
    that.queue = [];

    // last fetcher id
    that.lastFetcherId = 0;

    // list of fetchers
    that.fetchers = [];




    //********************************************************* event dispatcher
    that.fire = function(name, data) {
        document.dispatchEvent(new CustomEvent(name, {'detail':data}));
    }




    //******************************************************** kill all fetchers
    that.stop = function() {
        that.fire('fetch-kill', -1);
    }




    //************************************************************** url fetcher
    that.fetch = function(fetchId, url, fetchRef) {

        //console.log('start fetcher #'+fetchId);
        var len = 0;
        var xhr = new XMLHttpRequest();
        var cb_progress;
        var cb_kill;


        // progress listener
        xhr.addEventListener('progress', cb_progress = function(e) {

            // extract chunk data
            var chunkData = xhr.response.substr(len);

            // chunk id
            var chunkId = chunkData.match(/id="([a-z0-9]+)"/)[1];

            // update response end point
            len = xhr.response.length;

            // signal end of chunk processing
            that.fire('chunk-ready', {
                'fetchId'   : fetchId,
                'fetchRef'  : fetchRef,
                'chunkId'   : chunkId,
                'chunkData' : chunkData,
            });
        }, false);


        // kill switch
        document.addEventListener('fetch-kill', cb_kill = function(e) {

            // kill this fetcher or all fetchers (-1)
            if(e.detail == fetchId || e.detail == -1) {

                //console.log('kill fetcher #'+fetchId);

                xhr.removeEventListener('progress', cb_progress);
                document.removeEventListener('fetch-kill', cb_kill);

                xhr.abort();
                that.fetchers.shift(); // remove oldest fetcher from list
                xhr = null;
                delete xhr;
            }
        }, false);


        // go
        xhr.open('GET', url, true);
        xhr.responseType = 'text';
        xhr.send();
    };




    //****************************************************** start a new fetcher
    that.fetchRun = function() {

        // new id
        var id = ++that.lastFetcherId;

        //console.log('create fetcher #'+id);

        // create fetcher with new id
        var fetchRef = {
            'id'           : id,    // self id
            'queue'        : [],    // internal queue
            'chunksIds'    : [],    // retrieved ids, also used to count
            'hasSuccessor' : false, // keep track of next fetcher spawn
            'ignoreId'     : null,  // when set, ignore chunks until this id is received (this id included)
        };
        that.fetchers.push(fetchRef);

        // run fetcher
        that.fetch(id, url, fetchRef);
    };




    //************************************************ a fetcher returns a chunk
    document.addEventListener('chunk-ready', function(e) {

        // shorthand
        var f = e.detail;

        // ignore flag is not set, process chunk
        if(f.fetchRef.ignoreId == null) {

            // store chunk id
            f.fetchRef.chunksIds.push(f.chunkId);

            // create queue item
            var queueItem = {'id':f.chunkId, 'data':f.chunkData};

            // chunk is received from oldest fetcher
            if(f.fetchId == that.fetchers[0].id) {

                // send to main queue
                that.queue.push(queueItem);

                // signal queue insertion
                that.fire('queue-new');
            }
            // not oldest fetcher
            else {

                // use fetcher internal queue
                f.fetchRef.queue.push(queueItem);
            }
        }
        // ignore flag is set, current chunk id the one to ignore
        else if(f.fetchRef.ignoreId == f.chunkId) {

            // disable ignore flag
            f.fetchRef.ignoreId = null;
        }







        //******************** check chunks count for fetcher, threshold reached
        if(f.fetchRef.chunksIds.length >= chunkMax && !f.fetchRef.hasSuccessor) {

            // remember the spawn
            f.fetchRef.hasSuccessor = true;

            // spawn new fetcher
            that.fetchRun();
        }




        /***********************************************************************
        check if the first chunk of the second oldest fetcher exists in the
        oldest fetcher.
        If true, then they overlap and we can kill the oldest fetcher
        ***********************************************************************/
        if(
            // is this the oldest fetcher ?
            f.fetchId == that.fetchers[0].id
            // is there a successor ?
            && that.fetchers[1]
            // has oldest fetcher received the first chunk of its successor ?
            && that.fetchers[0].chunksIds.indexOf(
                that.fetchers[1].chunksIds[0]
            ) > -1
        ) {

            // get index of last chunk of the oldest fetcher within successor queue
            var lastChunkId    = that.fetchers[0].chunksIds[that.fetchers[0].chunksIds.length-1]
            var lastChunkIndex = that.fetchers[1].chunksIds.indexOf(lastChunkId);

            // successor has not reached its parent last chunk
            if(lastChunkIndex < 0) {

                // discard whole queue
                that.fetchers[1].queue     = [];
                that.fetchers[1].chunksIds = [];

                // set ignore id in successor to future discard duplicates
                that.fetchers[1].ignoreId = lastChunkId;
            }
            // there is overlap
            else {

                /**
                console.log('triming queue start: '+that.fetchers[1].queue.length
                    +"   "+(lastChunkIndex+1)
                    +"   "+(that.fetchers[1].queue.length-1)
                );
                /**/
                var trimStart = lastChunkIndex+1;
                var trimEnd   = that.fetchers[1].queue.length-1;

                // trim queue
                that.fetchers[1].queue = that.fetchers[1].queue.splice(trimStart, trimEnd);
                that.fetchers[1].chunksIds = that.fetchers[1].chunksIds.splice(trimStart, trimEnd);

                //console.log('triming queue end: '+that.fetchers[1].queue.length);
            }

            // kill oldest fetcher
            that.fire('fetch-kill', that.fetchers[0].id);
        }





    }, false);




    //***************************************************** main queue processor
    document.addEventListener('queue-new', function(e) {

        // process chunks in queue
        while(that.queue.length > 0) {

            // get chunk and remove from queue
            var chunk = that.queue.shift();

            // output item to document
            if(that.output) {
                that.output.innerHTML += "<br />"+chunk.data;
            }
        }
    }, false);



    //****************************************************** start first fetcher
    that.fetchRun();
};


// run
var process = new listen('http://localhost:5500/streamRandom', 20);

// bind global kill switch to button
document.getElementById('stop').addEventListener('click', process.stop, false);

        </script>
    </body>
</html>
Run Code Online (Sandbox Code Playgroud)