如何检测相关事件的爆发,然后发出一个事件

rvr*_*bit 5 javascript node.js eventemitter

我的第一个问题!我真的很想问它,所以如果我能更好地问它,请帮助我改进它.

这是我发现的唯一一个与远程相关的问题,但我无法弄清楚如何将它与我想要做的事情联系起来(他们的问题是JQuery特定的;我的是Node.JS特定的ish [虽然我找到了]一个浏览器版本的EventEmitter,并且能够在浏览器中测试]): 使用jQuery为每个事件突发运行一次函数


问题

我知道一个过程会在一段时间内发出一连串事件.

为了模拟这个过程,我编写了这段代码:

/*******************************************************/
/*         This part taken directly from               */
/*       https://nodejs.org/api/events.html            */
/*          (with addition of "burstID")               */
/* */                                               /* */
/* */ const EventEmitter = require('events');       /* */
/* */                                               /* */
/* */ class MyEmitter extends EventEmitter {}       /* */
/* */                                               /* */
/* */ const myEmitter = new MyEmitter();            /* */
/* */ myEmitter.on('event', (burstID) => {          /* */
/* */   console.log('an event occurred!', burstID); /* */
/* */ });                                           /* */
/* */                                               /* */
/*******************************************************/

const millisecondsToSustainBurst = 3000 ;
const millisecondsBetweenPossibleEventEmissions = 200 ;
const millisecondsUntilStartNextBurst = 5000 ;
const millisecondsUntilNoMoreBursts = 23000 ;

const now = new Date() ;
console.log('Time now: ' + now + '; should run until about ' + new Date(now.getTime() + millisecondsUntilNoMoreBursts)) ;

const doRandomEmitBurst = (startTimestamp, millisecondsToSustainBurst, burstID) => {
    if (Math.random() > 0.5) myEmitter.emit('event', burstID) ;
    if (
        !((new Date()) - startTimestamp > millisecondsToSustainBurst)
    ) setTimeout(() => doRandomEmitBurst(startTimestamp, millisecondsToSustainBurst, burstID), millisecondsBetweenPossibleEventEmissions) ;
}

const doRegularRandomBursts = (startTimestamp, millisecondsUntilStartNextBurst, millisecondsUntilNoMoreBursts, callback) => {
    if (
        !((new Date()) - startTimestamp > millisecondsUntilNoMoreBursts)
    ) {
        const now = new Date() ;
        console.log('Time now: ' + now + '; starting random-event burst which will run for ' + (millisecondsToSustainBurst/1000) + ' seconds.  ') ;
        setTimeout(() => doRegularRandomBursts(startTimestamp, millisecondsUntilStartNextBurst, millisecondsUntilNoMoreBursts, callback), millisecondsUntilStartNextBurst) ;
        doRandomEmitBurst(new Date(), millisecondsToSustainBurst, 'burstThatStartedAt' + now.getHours() + 'h' + now.getMinutes() + 'm' + now.getSeconds() + 's') ;
    } else callback() ;
}

doRegularRandomBursts(new Date(), millisecondsUntilStartNextBurst, millisecondsUntilNoMoreBursts, () => console.log('Done at ' + (new Date()))) ;

const myBurstDetectedEmitter = new MyEmitter() ;
// NOW, what do I do HERE to say:
// I've seen a number of events occur in a 5-second period
// Now they've stopped
// Therefore I'm going to emit a different kind of event
Run Code Online (Sandbox Code Playgroud)

现在,让我们说我想听听这些爆发的发生.

我想在采取进一步行动之前确保爆发已经结束.

我该怎么办?


到目前为止我尝试过的

首先,我可以创建一个全局"var"(哎 - 我想避免变体),如下所示:

var timeLastUpdated = {} ;
Run Code Online (Sandbox Code Playgroud)

…然后…

function keepCheckingTimeLastUpdated(keyForUpdateCheck, callback) {
    const timestampToCheckInOneSecond = (typeof timeLastUpdated[keyForUpdateCheck] !== 'undefined' ? timeLastUpdated[keyForUpdateCheck] : (new Date())) ;
    setTimeout(() => {
        console.log(
            'checking if modifications to "' +
            keyForUpdateCheck +
            '" have occurred since ' +
            timestampToCheckInOneSecond
        ) ;
        if (timeLastUpdated[keyForUpdateCheck] === timestampToCheckInOneSecond) {
            delete timeLastUpdated[keyForUpdateCheck] ;
            callback() ;
        }
        else keepCheckingTimeLastUpdated(keyForUpdateCheck, callback) ;
    }, 5000) ;
}

const makeNotificationHandler = () => (keyForUpdateCheck) => {
    const timeNow = new Date() ;
    if (typeof timeLastUpdated[keyForUpdateCheck] === 'undefined') keepCheckingTimeLastUpdated(keyForUpdateCheck, () => console.log(keyForUpdateCheck + ' changed')) ;
    timeLastUpdated[keyForUpdateCheck] = timeNow ;
} ;

myEmitter.on('event', makeNotificationHandler()) ;
Run Code Online (Sandbox Code Playgroud)

这只是一个反模式(我希望我正确使用这个术语).我的直觉说,拥有一个全局对象是错误的方法,并且有一个更像功能编程的解决方案.


只对有兴趣的人:

(随意回答问题)

添加了复杂性:在我的示例代码中,"burstID"将永远不会相同,但在实际示例中,它可能是.我想要等到自上次"burstID"出现以来已经过去一定时间以便确定是否真的完成了突发变化.

对于上下文,在实际应用程序中,我使用node-postgres在PostGres数据库上设置"LISTEN" ."burstID"是一个表中的主键,也用作多个其他表中的外键.我正在监听所有使用共享密钥的表,而我收到的消息包含此密钥.

rvr*_*bit 0

在上面评论的帮助下回答我自己的问题。

\n\n

十分感谢你!” 感谢Scott Sauyet的帮助和鼓励。

\n\n
\n\n

我决定创建一个我称之为“累加器”的东西:一个自动累加器。消息被路由到这些累加器,我认为他们是仓库里带着箱子和秒表的人。

\n\n

一旦累加器被实例化,它就开始查看秒表。每次秒表到达终点时,累加器都会查看是否有与上次相同的消息堆。如果与上次相同,则累加器将其消息打包,将其交给仓库,然后在阳光明媚的地方退休。

\n\n

完整更新的代码如下。我欢迎任何改进代码的编辑。

\n\n
\n\n
// To test in a browser, use:\n// https://github.com/Olical/EventEmitter\n// \xe2\x80\xa6in place of const EventEmitter = require(\'events\');\n\n/*******************************************************/\n/*         This part taken directly from               */\n/*       https://nodejs.org/api/events.html            */\n/*          (with addition of "burstID")               */\n/* */                                               /* */\n/* */ const EventEmitter = require(\'events\');       /* */\n/* */                                               /* */\n/* */ class MyEmitter extends EventEmitter {}       /* */\n/* */                                               /* */\n/* */ const myEmitter = new MyEmitter();            /* */\n/* */ myEmitter.on(\'event\', (burstID) => {          /* */\n/*      console.log(\'an event occurred!\', burstID);    */\n/* */ });                                           /* */\n/* */                                               /* */\n/*******************************************************/\n\nconst millisecondsToSustainBurst = 3000 ;\nconst millisecondsBetweenPossibleEventEmissions = 200 ;\nconst millisecondsUntilStartNextBurst = 5000 ;\nconst millisecondsUntilNoMoreBursts = 23000 ;\n\nconst now = new Date() ;\nconsole.log(\'Bursts starting.  Time now: \' + now + \'; should run until about \' + new Date(now.getTime() + millisecondsUntilNoMoreBursts)) ;\n\nconst doRandomEmitBurst = (startTimestamp, millisecondsToSustainBurst, burstID) => {\n    if (Math.random() > 0.5) myEmitter.emit(\'event\', burstID) ;\n    if (\n        !((new Date()) - startTimestamp > millisecondsToSustainBurst)\n    ) setTimeout(() => doRandomEmitBurst(startTimestamp, millisecondsToSustainBurst, burstID), millisecondsBetweenPossibleEventEmissions) ;\n}\n\nconst doRegularRandomBursts = (startTimestamp, millisecondsUntilStartNextBurst, millisecondsUntilNoMoreBursts, callback) => {\n    if (\n        !((new Date()) - startTimestamp > millisecondsUntilNoMoreBursts)\n    ) {\n        const now = new Date() ;\n        console.log(\'Time now: \' + now + \'; starting random-event burst which will run for \' + (millisecondsToSustainBurst/1000) + \' seconds.  \') ;\n        setTimeout(() => doRegularRandomBursts(startTimestamp, millisecondsUntilStartNextBurst, millisecondsUntilNoMoreBursts, callback), millisecondsUntilStartNextBurst) ;\n        doRandomEmitBurst(new Date(), millisecondsToSustainBurst, \'burstThatStartedAt\' + now.getHours() + \'h\' + now.getMinutes() + \'m\' + now.getSeconds() + \'s\') ;\n    } else callback() ;\n}\n\ndoRegularRandomBursts(new Date(), millisecondsUntilStartNextBurst, millisecondsUntilNoMoreBursts, () => console.log(\'Done at \' + (new Date()))) ;\n\n\nconst makeAccumulomator = (config) => {\n    if (typeof config !== \'object\') throw new Error(\'Must specify configuration object.\') ;\n    if (typeof config.callback !== \'function\') throw new Error(\'Must specify callback function for when the end of new messages is reached.\') ;\n    if (typeof config.millisecondsBetweenChecks !== \'number\') throw new Error(\'Must specify milliseconds between checks.\') ;\n    if (Number.isInteger(config.millisecondsBetweenChecks) === false) throw new Error(\'Must specify milliseconds between checks as an integer.\') ;\n    if (typeof config.onStop !== \'function\' && typeof config.onStop !== \'undefined\') throw new Error(\'If defined at all, onStop must be a function.\') ;\n\n    const accumulomator = {} ;\n\n\n    var accumulatedMessages = [] ;\n    var stop = false ;\n\n    const keepCheckingTimeLastUpdated = (callback) => {\n        const timestampToCheckInOneSecond = (accumulatedMessages.length > 0 ? accumulatedMessages[accumulatedMessages.length - 1].timestamp : (new Date())) ;\n        setTimeout(() => {\n            if (stop) { if (typeof config.onStop === \'function\') config.onStop() ; }\n            else if (accumulatedMessages.length < 1) keepCheckingTimeLastUpdated(callback) ;\n            else if (accumulatedMessages[accumulatedMessages.length - 1].timestamp === timestampToCheckInOneSecond) { stop = true ; callback() ; }\n            else keepCheckingTimeLastUpdated(callback) ;\n        }, config.millisecondsBetweenChecks) ;\n    } ;\n    keepCheckingTimeLastUpdated(config.callback) ;\n\n    accumulomator.receiveMessage = (message) => accumulatedMessages.push({ message: message, timestamp: (new Date())}) ;\n\n    accumulomator.stopOnNextCheck = () => {\n        if (stop === true) throw new Error(\'Accumulomator is already stopped.\') ;\n        else stop = true ;\n    }\n\n    accumulomator.isActive = () => stop === false ;\n\n    accumulomator.getAccumulatedMessages = () => accumulatedMessages ;\n\n\n    return accumulomator ;\n\n}\n\nconst makeAccumulomatorWarehouse = (config) => {\n    if (typeof config !== \'object\') throw new Error(\'Must specify configuration object.\') ;\n    if (typeof config.callback !== \'function\') throw new Error(\'Must specify callback function.\') ;\n    if (typeof config.millisecondsBetweenChecks !== \'number\') throw new Error(\'Must specify milliseconds between checks.\') ;\n    if (Number.isInteger(config.millisecondsBetweenChecks) === false) throw new Error(\'Must specify milliseconds between checks as an integer.\') ;\n    if (typeof config.messageRouter !== \'function\') throw new Error(\'Must specify message router function.\') ;\n    if (typeof config.sendCallbackAccumulatedMessages !== \'undefined\') if (typeof config.sendCallbackAccumulatedMessages !== \'boolean\') throw new Error(\'Must specify whether or not to send callback accumulated messages as a boolean (if unspecified, accumulated messages will not be included).\') ;\n    if (typeof config.onAccumulomatorStop !== \'function\' && typeof config.onAccumulomatorStop !== \'undefined\') throw new Error(\'If defined at all, onAccumulomatorStop must be a function.\') ;\n\n    var sendCallbackAccumulatedMessages = false ;\n    if (typeof config.sendCallbackAccumulatedMessages !== \'undefined\') sendCallbackAccumulatedMessages = config.sendCallbackAccumulatedMessages ;\n\n    const accumulomatorWarehouse = {} ;\n\n\n    const accumulomators = {} ;\n\n    var warehouseIsShuttingDown = false ;\n\n    accumulomatorWarehouse.receiveMessage = (message) => {\n        accumulomatorName = config.messageRouter(message) ;\n        if (typeof accumulomatorName !== \'string\') throw new Error(\'The value returned from messageRouter must be a string with a unique identifier.\') ;\n        if (typeof accumulomators[accumulomatorName] === \'object\') if (accumulomators[accumulomatorName].isActive() === false) delete accumulomators[accumulomatorName] ;\n        if (typeof accumulomators[accumulomatorName] === \'undefined\') {\n            if (warehouseIsShuttingDown === false) accumulomators[accumulomatorName] = makeAccumulomator({\n                callback: () => config.callback(\n                    (() => {\n                        objectToReturn = {} ;\n                        objectToReturn.key = accumulomatorName ;\n                        if (sendCallbackAccumulatedMessages === true) objectToReturn.messages = accumulomators[accumulomatorName].getAccumulatedMessages() ;\n                        return objectToReturn ;\n                    })()\n                ) ,\n                millisecondsBetweenChecks: config.millisecondsBetweenChecks ,\n                onStop: () => config.onAccumulomatorStop(accumulomatorName)\n            }) ;\n        }\n        if (typeof accumulomators[accumulomatorName] === \'object\') accumulomators[accumulomatorName].receiveMessage(message) ;\n    }\n\n    periodicallyRetireAccumulomators = () => {\n        Object.keys(accumulomators).forEach((accumulomator) => {\n            if (accumulomators[accumulomator].isActive() === false) delete accumulomators[accumulomator] ;\n        }) ;\n        if (!(warehouseIsShuttingDown === true && Object.keys(accumulomators).length === 0)) setTimeout(periodicallyRetireAccumulomators, 10000) ;\n    } ;\n    periodicallyRetireAccumulomators() ;\n\n    accumulomatorWarehouse.shutDownWarehouse = () => {\n        Object.keys(accumulomators).forEach((accumulomator) => {\n            if (accumulomators[accumulomator].isActive() === true) accumulomators[accumulomator].stopOnNextCheck() ;\n        }) ;\n        warehouseIsShuttingDown = true ;\n    }\n\n\n    return accumulomatorWarehouse ;\n\n}\n\nmyAccumulomatorWarehouse = makeAccumulomatorWarehouse({\n    callback: (accumulomatorWarehousePackage) => console.log(\'Done with accumulomator.\', accumulomatorWarehousePackage.key, accumulomatorWarehousePackage.messages) ,\n    millisecondsBetweenChecks: 2000 ,\n    messageRouter: (message) => message ,\n    sendCallbackAccumulatedMessages: true ,\n    onAccumulomatorStop: (accumulomatorName) => console.log(\'Accumulomator for \' + accumulomatorName + \' manually stopped\')\n}) ;\n\nmyEmitter.on(\'event\', myAccumulomatorWarehouse.receiveMessage) ;\n\nsetTimeout(myAccumulomatorWarehouse.shutDownWarehouse, 10000) ;\n
Run Code Online (Sandbox Code Playgroud)\n