rvr*_*bit 5 javascript node.js eventemitter
我的第一个问题!我真的很想问它,所以如果我能更好地问它,请帮助我改进它.
这是我发现的唯一一个与远程相关的问题,但我无法弄清楚如何将它与我想要做的事情联系起来(他们的问题是JQuery特定的;我的是Node.JS特定的ish [虽然我找到了]一个浏览器版本的EventEmitter,并且能够在浏览器中测试]): 使用jQuery为每个事件突发运行一次函数
我知道一个过程会在一段时间内发出一连串事件.
为了模拟这个过程,我编写了这段代码:
/*******************************************************/
/* This part taken directly from */
/* https://nodejs.org/api/events.html */
/* (with addition of "burstID") */
/* */ /* */
/* */ const EventEmitter = require('events'); /* */
/* */ /* */
/* */ class MyEmitter extends EventEmitter {} /* */
/* */ /* */
/* */ const myEmitter = new MyEmitter(); /* */
/* */ myEmitter.on('event', (burstID) => { /* */
/* */ console.log('an event occurred!', burstID); /* */
/* */ }); /* */
/* */ /* */
/*******************************************************/
const millisecondsToSustainBurst = 3000 ;
const millisecondsBetweenPossibleEventEmissions = 200 ;
const millisecondsUntilStartNextBurst = 5000 ;
const millisecondsUntilNoMoreBursts = 23000 ;
const now = new Date() ;
console.log('Time now: ' + now + '; should run until about ' + new Date(now.getTime() + millisecondsUntilNoMoreBursts)) ;
const doRandomEmitBurst = (startTimestamp, millisecondsToSustainBurst, burstID) => {
if (Math.random() > 0.5) myEmitter.emit('event', burstID) ;
if (
!((new Date()) - startTimestamp > millisecondsToSustainBurst)
) setTimeout(() => doRandomEmitBurst(startTimestamp, millisecondsToSustainBurst, burstID), millisecondsBetweenPossibleEventEmissions) ;
}
const doRegularRandomBursts = (startTimestamp, millisecondsUntilStartNextBurst, millisecondsUntilNoMoreBursts, callback) => {
if (
!((new Date()) - startTimestamp > millisecondsUntilNoMoreBursts)
) {
const now = new Date() ;
console.log('Time now: ' + now + '; starting random-event burst which will run for ' + (millisecondsToSustainBurst/1000) + ' seconds. ') ;
setTimeout(() => doRegularRandomBursts(startTimestamp, millisecondsUntilStartNextBurst, millisecondsUntilNoMoreBursts, callback), millisecondsUntilStartNextBurst) ;
doRandomEmitBurst(new Date(), millisecondsToSustainBurst, 'burstThatStartedAt' + now.getHours() + 'h' + now.getMinutes() + 'm' + now.getSeconds() + 's') ;
} else callback() ;
}
doRegularRandomBursts(new Date(), millisecondsUntilStartNextBurst, millisecondsUntilNoMoreBursts, () => console.log('Done at ' + (new Date()))) ;
const myBurstDetectedEmitter = new MyEmitter() ;
// NOW, what do I do HERE to say:
// I've seen a number of events occur in a 5-second period
// Now they've stopped
// Therefore I'm going to emit a different kind of event
Run Code Online (Sandbox Code Playgroud)
现在,让我们说我想听听这些爆发的发生.
我想在采取进一步行动之前确保爆发已经结束.
我该怎么办?
首先,我可以创建一个全局"var"(哎 - 我想避免变体),如下所示:
var timeLastUpdated = {} ;
Run Code Online (Sandbox Code Playgroud)
…然后…
function keepCheckingTimeLastUpdated(keyForUpdateCheck, callback) {
const timestampToCheckInOneSecond = (typeof timeLastUpdated[keyForUpdateCheck] !== 'undefined' ? timeLastUpdated[keyForUpdateCheck] : (new Date())) ;
setTimeout(() => {
console.log(
'checking if modifications to "' +
keyForUpdateCheck +
'" have occurred since ' +
timestampToCheckInOneSecond
) ;
if (timeLastUpdated[keyForUpdateCheck] === timestampToCheckInOneSecond) {
delete timeLastUpdated[keyForUpdateCheck] ;
callback() ;
}
else keepCheckingTimeLastUpdated(keyForUpdateCheck, callback) ;
}, 5000) ;
}
const makeNotificationHandler = () => (keyForUpdateCheck) => {
const timeNow = new Date() ;
if (typeof timeLastUpdated[keyForUpdateCheck] === 'undefined') keepCheckingTimeLastUpdated(keyForUpdateCheck, () => console.log(keyForUpdateCheck + ' changed')) ;
timeLastUpdated[keyForUpdateCheck] = timeNow ;
} ;
myEmitter.on('event', makeNotificationHandler()) ;
Run Code Online (Sandbox Code Playgroud)
这只是一个反模式(我希望我正确使用这个术语).我的直觉说,拥有一个全局对象是错误的方法,并且有一个更像功能编程的解决方案.
只对有兴趣的人:
(随意回答问题)
添加了复杂性:在我的示例代码中,"burstID"将永远不会相同,但在实际示例中,它可能是.我想要等到自上次"burstID"出现以来已经过去一定时间以便确定是否真的完成了突发变化.
对于上下文,在实际应用程序中,我使用node-postgres在PostGres数据库上设置"LISTEN" ."burstID"是一个表中的主键,也用作多个其他表中的外键.我正在监听所有使用共享密钥的表,而我收到的消息包含此密钥.
在上面评论的帮助下回答我自己的问题。
\n\n十分感谢你!” 感谢Scott Sauyet的帮助和鼓励。
\n\n我决定创建一个我称之为“累加器”的东西:一个自动累加器。消息被路由到这些累加器,我认为他们是仓库里带着箱子和秒表的人。
\n\n一旦累加器被实例化,它就开始查看秒表。每次秒表到达终点时,累加器都会查看是否有与上次相同的消息堆。如果与上次相同,则累加器将其消息打包,将其交给仓库,然后在阳光明媚的地方退休。
\n\n完整更新的代码如下。我欢迎任何改进代码的编辑。
\n\n// To test in a browser, use:\n// https://github.com/Olical/EventEmitter\n// \xe2\x80\xa6in place of const EventEmitter = require(\'events\');\n\n/*******************************************************/\n/* This part taken directly from */\n/* https://nodejs.org/api/events.html */\n/* (with addition of "burstID") */\n/* */ /* */\n/* */ const EventEmitter = require(\'events\'); /* */\n/* */ /* */\n/* */ class MyEmitter extends EventEmitter {} /* */\n/* */ /* */\n/* */ const myEmitter = new MyEmitter(); /* */\n/* */ myEmitter.on(\'event\', (burstID) => { /* */\n/* console.log(\'an event occurred!\', burstID); */\n/* */ }); /* */\n/* */ /* */\n/*******************************************************/\n\nconst millisecondsToSustainBurst = 3000 ;\nconst millisecondsBetweenPossibleEventEmissions = 200 ;\nconst millisecondsUntilStartNextBurst = 5000 ;\nconst millisecondsUntilNoMoreBursts = 23000 ;\n\nconst now = new Date() ;\nconsole.log(\'Bursts starting. Time now: \' + now + \'; should run until about \' + new Date(now.getTime() + millisecondsUntilNoMoreBursts)) ;\n\nconst doRandomEmitBurst = (startTimestamp, millisecondsToSustainBurst, burstID) => {\n if (Math.random() > 0.5) myEmitter.emit(\'event\', burstID) ;\n if (\n !((new Date()) - startTimestamp > millisecondsToSustainBurst)\n ) setTimeout(() => doRandomEmitBurst(startTimestamp, millisecondsToSustainBurst, burstID), millisecondsBetweenPossibleEventEmissions) ;\n}\n\nconst doRegularRandomBursts = (startTimestamp, millisecondsUntilStartNextBurst, millisecondsUntilNoMoreBursts, callback) => {\n if (\n !((new Date()) - startTimestamp > millisecondsUntilNoMoreBursts)\n ) {\n const now = new Date() ;\n console.log(\'Time now: \' + now + \'; starting random-event burst which will run for \' + (millisecondsToSustainBurst/1000) + \' seconds. \') ;\n setTimeout(() => doRegularRandomBursts(startTimestamp, millisecondsUntilStartNextBurst, millisecondsUntilNoMoreBursts, callback), millisecondsUntilStartNextBurst) ;\n doRandomEmitBurst(new Date(), millisecondsToSustainBurst, \'burstThatStartedAt\' + now.getHours() + \'h\' + now.getMinutes() + \'m\' + now.getSeconds() + \'s\') ;\n } else callback() ;\n}\n\ndoRegularRandomBursts(new Date(), millisecondsUntilStartNextBurst, millisecondsUntilNoMoreBursts, () => console.log(\'Done at \' + (new Date()))) ;\n\n\nconst makeAccumulomator = (config) => {\n if (typeof config !== \'object\') throw new Error(\'Must specify configuration object.\') ;\n if (typeof config.callback !== \'function\') throw new Error(\'Must specify callback function for when the end of new messages is reached.\') ;\n if (typeof config.millisecondsBetweenChecks !== \'number\') throw new Error(\'Must specify milliseconds between checks.\') ;\n if (Number.isInteger(config.millisecondsBetweenChecks) === false) throw new Error(\'Must specify milliseconds between checks as an integer.\') ;\n if (typeof config.onStop !== \'function\' && typeof config.onStop !== \'undefined\') throw new Error(\'If defined at all, onStop must be a function.\') ;\n\n const accumulomator = {} ;\n\n\n var accumulatedMessages = [] ;\n var stop = false ;\n\n const keepCheckingTimeLastUpdated = (callback) => {\n const timestampToCheckInOneSecond = (accumulatedMessages.length > 0 ? accumulatedMessages[accumulatedMessages.length - 1].timestamp : (new Date())) ;\n setTimeout(() => {\n if (stop) { if (typeof config.onStop === \'function\') config.onStop() ; }\n else if (accumulatedMessages.length < 1) keepCheckingTimeLastUpdated(callback) ;\n else if (accumulatedMessages[accumulatedMessages.length - 1].timestamp === timestampToCheckInOneSecond) { stop = true ; callback() ; }\n else keepCheckingTimeLastUpdated(callback) ;\n }, config.millisecondsBetweenChecks) ;\n } ;\n keepCheckingTimeLastUpdated(config.callback) ;\n\n accumulomator.receiveMessage = (message) => accumulatedMessages.push({ message: message, timestamp: (new Date())}) ;\n\n accumulomator.stopOnNextCheck = () => {\n if (stop === true) throw new Error(\'Accumulomator is already stopped.\') ;\n else stop = true ;\n }\n\n accumulomator.isActive = () => stop === false ;\n\n accumulomator.getAccumulatedMessages = () => accumulatedMessages ;\n\n\n return accumulomator ;\n\n}\n\nconst makeAccumulomatorWarehouse = (config) => {\n if (typeof config !== \'object\') throw new Error(\'Must specify configuration object.\') ;\n if (typeof config.callback !== \'function\') throw new Error(\'Must specify callback function.\') ;\n if (typeof config.millisecondsBetweenChecks !== \'number\') throw new Error(\'Must specify milliseconds between checks.\') ;\n if (Number.isInteger(config.millisecondsBetweenChecks) === false) throw new Error(\'Must specify milliseconds between checks as an integer.\') ;\n if (typeof config.messageRouter !== \'function\') throw new Error(\'Must specify message router function.\') ;\n if (typeof config.sendCallbackAccumulatedMessages !== \'undefined\') if (typeof config.sendCallbackAccumulatedMessages !== \'boolean\') throw new Error(\'Must specify whether or not to send callback accumulated messages as a boolean (if unspecified, accumulated messages will not be included).\') ;\n if (typeof config.onAccumulomatorStop !== \'function\' && typeof config.onAccumulomatorStop !== \'undefined\') throw new Error(\'If defined at all, onAccumulomatorStop must be a function.\') ;\n\n var sendCallbackAccumulatedMessages = false ;\n if (typeof config.sendCallbackAccumulatedMessages !== \'undefined\') sendCallbackAccumulatedMessages = config.sendCallbackAccumulatedMessages ;\n\n const accumulomatorWarehouse = {} ;\n\n\n const accumulomators = {} ;\n\n var warehouseIsShuttingDown = false ;\n\n accumulomatorWarehouse.receiveMessage = (message) => {\n accumulomatorName = config.messageRouter(message) ;\n if (typeof accumulomatorName !== \'string\') throw new Error(\'The value returned from messageRouter must be a string with a unique identifier.\') ;\n if (typeof accumulomators[accumulomatorName] === \'object\') if (accumulomators[accumulomatorName].isActive() === false) delete accumulomators[accumulomatorName] ;\n if (typeof accumulomators[accumulomatorName] === \'undefined\') {\n if (warehouseIsShuttingDown === false) accumulomators[accumulomatorName] = makeAccumulomator({\n callback: () => config.callback(\n (() => {\n objectToReturn = {} ;\n objectToReturn.key = accumulomatorName ;\n if (sendCallbackAccumulatedMessages === true) objectToReturn.messages = accumulomators[accumulomatorName].getAccumulatedMessages() ;\n return objectToReturn ;\n })()\n ) ,\n millisecondsBetweenChecks: config.millisecondsBetweenChecks ,\n onStop: () => config.onAccumulomatorStop(accumulomatorName)\n }) ;\n }\n if (typeof accumulomators[accumulomatorName] === \'object\') accumulomators[accumulomatorName].receiveMessage(message) ;\n }\n\n periodicallyRetireAccumulomators = () => {\n Object.keys(accumulomators).forEach((accumulomator) => {\n if (accumulomators[accumulomator].isActive() === false) delete accumulomators[accumulomator] ;\n }) ;\n if (!(warehouseIsShuttingDown === true && Object.keys(accumulomators).length === 0)) setTimeout(periodicallyRetireAccumulomators, 10000) ;\n } ;\n periodicallyRetireAccumulomators() ;\n\n accumulomatorWarehouse.shutDownWarehouse = () => {\n Object.keys(accumulomators).forEach((accumulomator) => {\n if (accumulomators[accumulomator].isActive() === true) accumulomators[accumulomator].stopOnNextCheck() ;\n }) ;\n warehouseIsShuttingDown = true ;\n }\n\n\n return accumulomatorWarehouse ;\n\n}\n\nmyAccumulomatorWarehouse = makeAccumulomatorWarehouse({\n callback: (accumulomatorWarehousePackage) => console.log(\'Done with accumulomator.\', accumulomatorWarehousePackage.key, accumulomatorWarehousePackage.messages) ,\n millisecondsBetweenChecks: 2000 ,\n messageRouter: (message) => message ,\n sendCallbackAccumulatedMessages: true ,\n onAccumulomatorStop: (accumulomatorName) => console.log(\'Accumulomator for \' + accumulomatorName + \' manually stopped\')\n}) ;\n\nmyEmitter.on(\'event\', myAccumulomatorWarehouse.receiveMessage) ;\n\nsetTimeout(myAccumulomatorWarehouse.shutDownWarehouse, 10000) ;\nRun Code Online (Sandbox Code Playgroud)\n
| 归档时间: |
|
| 查看次数: |
148 次 |
| 最近记录: |