我正在使用node-amqp库连接到在线stomp服务,在我使用stomp-client之前,它连接非常成功,但它不支持自动故障检测和重新连接,所以我想切换到node-amqp 提供更强大的支持。
var amqp = require('amqp');
var option = {
host: 'host'
, port: 61618
, login: 'my username'
, password: 'my password'
};
var implOpts = {
reconnect: true,
reconnectBackoffStrategy: 'exponential',
reconnectBackoffTime: 500
};
var connection = amqp.createConnection(option,implOpts);
connection.addListener('ready', function(){
console.log('ready connection ');
});
connection.on('error', function (error) {
console.log('Connection error' ,error);
});
connection.on('close', function () {
console.log('Connection close ');
});
Run Code Online (Sandbox Code Playgroud)
主机名、密码、用户名和端口正确并且在 stomp-client 库示例中工作。但是,通过使用上面的代码,我收到一条错误消息:连接错误 { message: '连接结束:可能是由于身份验证失败。' }。我查看了代码,没有发现我的身份验证或代码有任何问题。
这是 stomp-client 库中的工作代码。
var StompClient = …Run Code Online (Sandbox Code Playgroud) 我将 node-amqp 与 Node.js 结合使用来消耗队列中的消息。我们的管理员有 60 分钟的空闲连接超时,这会导致绑定额外的队列并孤立我之前创建的队列的先前通道。我的日志如下所示,请注意每小时绑定一个额外队列(下一次将是 3 个,然后是 4 个,依此类推):
[app] 2014-08-07T16:15:25.000Z: 2014-08-07T16:15:25.174Z - debug: ConsumerTag: node-amqp-145-0.9590792271774262
[app] 2014-08-07T16:15:24.000Z: 2014-08-07T16:15:24.751Z - debug: AMQP Queue bound successfully.
[app] 2014-08-07T16:15:24.000Z: 2014-08-07T16:15:24.731Z - debug: AMQP Queue bound successfully.
[app] 2014-08-07T16:15:24.000Z: 2014-08-07T16:15:24.344Z - debug: AMQP Queue is subscribing...
[app] 2014-08-07T16:15:24.000Z: 2014-08-07T16:15:24.344Z - debug: AMQP Queue is binding...
[app] 2014-08-07T16:15:23.000Z: 2014-08-07T16:15:23.831Z - debug: AMQP Queue is initializing...
[app] 2014-08-07T15:13:36.000Z: 2014-08-07T15:13:36.933Z - debug: ConsumerTag: node-amqp-145-0.6444592161569744
[app] 2014-08-07T15:13:36.000Z: 2014-08-07T15:13:36.658Z - debug: AMQP Queue bound successfully.
[app] …Run Code Online (Sandbox Code Playgroud) 我正在寻找推荐的方法来通过这个库处理意外的通道关闭。
我从文档中收集到有一个事件发出,但我不太清楚如何最好地检测该事件(我知道这很简单,但我对一般事件发出不是很清楚,所以请耐心等待) ,而且,我应该用它做什么并不明显。
我应该跟踪哪些订阅者使用该频道然后重新订阅他们吗?其他人做什么?
更重要的是 - 是否没有办法检查 Channel(或ConfirmedChannel)对象,并确定它是否仍然“良好”?这似乎比事件捕获方法更可取,但我似乎找不到一种方法来做到这一点(好吧,这并不完全正确 - 我已经通过检查 Channel 上的“accept”方法做了一些事情确定它是否坏了,但这看起来很黑客)。
任何指导表示赞赏。
使用 RabbitMQ Web UI,当我发布到没有当前队列绑定的主题交换时,rabbitmq 表示消息已发布但尚未路由。
使用 amqp.node,当我将队列绑定到交换并开始使用“#”(全部)使用时,我没有得到任何东西。
我期待收到之前发布但未路由的消息。这可能吗?
我想创建一个消费者来处理来自多个可变数量的源的消息,这些源动态连接或断开连接。
\n\n我需要的是每个消费者优先考虑每个来源的前 N 条消息。然后运行多个消费者来提高速度。
\n\n我一直在阅读工作队列、路由和主题的文档,以及许多其他文档,但没有确定如何实现这一点。我还做了一些测试,但没有运气。
\n\n有人可以指出我该怎么做或在哪里阅读有关它的内容吗?
\n\n- 编辑 -
\n\n队列A-----A3--A2--A1-\xe2\x94\x90
\n\nQueueB-----B3--B2--B1-\xe2\x94\xbc--------消费者
\n\n队列C-----C3--C2--C1-\xe2\x94\x98
\n\n期望的效果是每个消费者获取每个队列的第一条消息。例如:A1、B1、C1、A2、B2、C2、A3、B3、C3 等。如果创建了一个新队列 (QueueD),消费者将以相同的方式开始从该队列接收消息。
\n\n提前致谢
\n我需要一些帮助,因为我不知道该怎么做。我必须在我的 Angular 项目中使用“amqplib”。首先,我尝试使用“amqp-ts”,但是当我打开浏览器时,我遇到了错误:
经过几次尝试,我已切换到“amqplib”。但是,当我尝试调用库的函数时,出现错误:
错误发生在以下行:
if (Buffer.from && Buffer.alloc && Buffer.allocUnsafe && Buffer.allocUnsafeSlow) {
module.exports = buffer
} else {
// Copy properties from require('buffer')
copyProps(buffer, exports)
exports.Buffer = SafeBuffer
}
Run Code Online (Sandbox Code Playgroud)
同样,Buffer 没有定义。
我已经尝试了这里提到的几种解决方案:Angular 6 Uncaught ReferenceError: Buffer is not Defined,我最终得到:-installed 'buffer' package
- 在polyfills.ts中
declare var require: any;
declare var global: any;
(window as any).global = window;
// @ts-ignore
window.Buffer = window.Buffer || require('buffer').Buffer;
(window as any).process = {
version: ''
};
Run Code Online (Sandbox Code Playgroud)
-在index.html中
<script>
var global = …Run Code Online (Sandbox Code Playgroud) 如何将 NodeJS 中 amqplib 的预取计数设置为 1? 链接到 git 上的 Lib
期望的结果是消费者仅从队列中获取一条消息来处理它,并在完成后获取一条新消息。我有一个设置,其中一些消息需要很长时间才能处理,而另一些则需要很短的时间。因此,我不仅仅是想向所有消费者平等地分享这些信息。
我是AMQP / RabbitMQ新手,也是相对的Node.js新手。我可以在客户端使用amqplib NPM库吗?
我希望能够从Angular应用将消息直接推送到RabbitMQ。我已经使用Browserify模块化了很多客户端代码。我现在开始尝试RabbitMQ,并希望通过amqp协议将消息从浏览器直接推送到基于云的队列。
我已经通过NPM安装了amqplib并编写/粘贴了以下模块:
var amqp = require('amqplib/callback_api');
var push = function(){
console.log('This is the CORE queue.pusher push function being triggered');
var connString = 'amqp://username:pwd@blabla.rmq.cloudamqp.com/username';
amqp.connect(connString, function(err, conn) {
if (err){
console.log("core queue.pusher push error %s", err);
}else {
conn.createChannel(function (err, ch) {
var q = 'FatController';
var msg = 'Hello World!';
ch.assertQueue(q, {durable: false});
// Note: on Node 6 Buffer.from(msg) should be used
ch.sendToQueue(q, new Buffer(msg));
console.log(" [x] Sent %s", msg);
});
setTimeout(function () {
conn.close();
process.exit(0) …Run Code Online (Sandbox Code Playgroud)