ryn*_*nop 6 amazon-web-services node.js amazon-elasticache
我是一个节点菜鸟,并试图了解如何在node.js应用程序中实现自动发现.我将使用群集模块,并希望每个工作进程保持最新(并持久连接到)弹性缓存节点.
由于没有共享内存的概念(如PHP APC),您是否必须拥有在每个工作程序中运行的代码,每隔X秒唤醒一次并以某种方式更新IP列表并重新连接内存缓存客户端?
今天人们如何解决这个问题?示例代码将非常感激.
小智 3
请注意,目前自动发现仅适用于运行 memcached 引擎的缓存集群。
对于缓存引擎版本 1.4.14 或更高版本,您需要创建一个到缓存集群配置端点(或任何缓存节点端点)的 TCP/IP 套接字并发送以下命令:
config get cluster
Run Code Online (Sandbox Code Playgroud)
在 Node.js 中,您可以使用net.Socket 类来实现这一点。
回复由两行组成:
配置信息的版本号。每次在缓存集群中添加或删除节点时,版本号都会加一。
缓存节点列表。列表中的每个节点都由主机名|IP 地址|端口组表示,每个节点由空格分隔。
每行末尾出现一个回车符和一个换行符 (CR + LF)。
您可以在这里找到有关如何将自动发现添加到客户端库的更全面的说明。
使用集群模块,您需要在每个进程(即子进程)中存储相同的信息,我将使用每个子进程的“setInterval”来定期检查(例如每 60 秒)节点列表,并仅在列表发生更改时重新连接(这种情况不应该经常发生)。
您可以选择仅更新主服务器上的列表,并使用“worker.send”来更新工作人员。这可以使在单个服务器中运行的所有进程保持更加同步,但在多服务器架构中没有帮助,因此使用一致的散列非常重要,以便能够更改节点列表并释放“ memcached 集群中存储的键的最小数量。
我会使用全局变量来存储这种配置。
三思而后行,您可以使用适用于 Node.js 的 AWS 开发工具包来获取 ElastiCache 节点列表(这也适用于 Redis 引擎)。
在这种情况下,代码将类似于:
var util = require('util'),
AWS = require('aws-sdk'),
Memcached = require('memcached');
global.AWS_REGION = 'eu-west-1'; // Just as a sample I'm using the EU West region
global.CACHE_CLUSTER_ID = 'test';
global.CACHE_ENDPOINTS = [];
global.MEMCACHED = null;
function init() {
AWS.config.update({
region: global.AWS_REGION
});
elasticache = new AWS.ElastiCache();
function getElastiCacheEndpoints() {
function sameEndpoints(list1, list2) {
if (list1.length != list2.length)
return false;
return list1.every(
function(e) {
return list2.indexOf(e) > -1;
});
}
function logElastiCacheEndpoints() {
global.CACHE_ENDPOINTS.forEach(
function(e) {
util.log('Memcached Endpoint: ' + e);
});
}
elasticache.describeCacheClusters({
CacheClusterId: global.CACHE_CLUSTER_ID,
ShowCacheNodeInfo: true
},
function(err, data) {
if (!err) {
util.log('Describe Cache Cluster Id:' + global.CACHE_CLUSTER_ID);
if (data.CacheClusters[0].CacheClusterStatus == 'available') {
var endpoints = [];
data.CacheClusters[0].CacheNodes.forEach(
function(n) {
var e = n.Endpoint.Address + ':' + n.Endpoint.Port;
endpoints.push(e);
});
if (!sameEndpoints(endpoints, global.CACHE_ENDPOINTS)) {
util.log('Memached Endpoints changed');
global.CACHE_ENDPOINTS = endpoints;
if (global.MEMCACHED)
global.MEMCACHED.end();
global.MEMCACHED = new Memcached(global.CACHE_ENDPOINTS);
process.nextTick(logElastiCacheEndpoints);
setInterval(getElastiCacheEndpoints, 60000); // From now on, update every 60 seconds
}
} else {
setTimeout(getElastiCacheEndpoints, 10000); // Try again after 10 seconds until 'available'
}
} else {
util.log('Error describing Cache Cluster:' + err);
}
});
}
getElastiCacheEndpoints();
}
init();
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
4005 次 |
| 最近记录: |