我是新的 docker 世界,我正在尝试访问 Windows 10 上的 RabbitMQ 管理插件。我正在关注这个。但是当我尝试“ http://container-ip:15672 ”时,我无法访问管理。
有人有解决此类问题的经验吗?
我有一台安装了 RabbitMQ 代理的服务器和两个 Celery 消费者(main1.py和main2.py),它们都连接到同一个代理。
在第一个消费者 ( main1.py ) 中,我实现了一个 Celery Beat,它在特定队列上多次发送不同的任务:
app = Celery('tasks', broker=..., backend=...)
app.conf.task_routes = (
[
('tasks.beat', {'queue': 'print-queue'}),
],
)
app.conf.beat_schedule = {
'beat-every-10-seconds': {
'task': 'tasks.beat',
'schedule': 10.0
},
}
@app.task(name='tasks.beat', bind=True)
def beat(self):
for i in range(10):
app.send_task("tasks.print", args=[i], queue="print-queue")
return None
Run Code Online (Sandbox Code Playgroud)
在第二个消费者(main2.py)中,我实现了上面所说的任务:
app = Celery('tasks', broker=..., backend=...)
app.conf.task_routes = (
[
('tasks.print', {'queue': 'print-queue'}),
],
)
@app.task(name='tasks.print', bind=True)
def print(self, name):
return name
Run Code Online (Sandbox Code Playgroud)
当我启动两个 Celery …
我知道可以RABBITMQ_BASE
在 Windows 上设置环境变量来更改RabbitMQ存储/查找其所有移动位的位置。
我的问题是:是否建议始终将其设置为确定性文件夹?
我发现整个 %APPDATA% 位置似乎有问题,即使要使用特定的服务帐户,而不是Local System
其他帐户。
也许我正在回答我自己的问题,但我想知道在野外对此有何看法。
不同场景下哪个效果更好?
我知道RabbitMQ是基于AMQP协议的,并且为开发人员提供了可视化功能。
我有以下生产者代码。
message是经过一些计算得到的dict数据。我想将消息发布到队列。然后我将消息重置为空字典以进行其他一些计算。但消费者总是从队列中得到一个空的字典。我觉得消息在发布之前已重置(是异步的吗?)。如何使其同步?
message = {a big dict ...}
channel.basic_publish(exchange='',
routing_key='my_queue',
body=json.dumps(message))
message = {}
Run Code Online (Sandbox Code Playgroud) 我是 NiFi 新手,欢迎建议。
我们在许多小记录中获取从外部源发送来的数据。我正在考虑通过 RabbitMQ 将这些记录拉入 NiFi。我想将这些记录“假脱机”或“批处理”成更大的分组(可能基于记录中的某些索引),并且当一组记录达到特定大小阈值时写入到 S3。
如何在 NiFi 中最好地实现这一点?还有其他建议吗?
谢谢,加里
我想为多个队列创建单个消费者(通用监听器)。消费者应该监听多个队列。
让我们看看例子
channel.ExchangeDeclare(exchange: "logs", type: "fanout");
var queueName = "QeueueName.Instance1";
channel.QueueBind(queue: queueName,
exchange: "logs",
routingKey: "");
Console.WriteLine(" [*] Waiting for logs.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] {0}", message);
};
Run Code Online (Sandbox Code Playgroud)
我想将消费者与动态队列数关联起来,它们会不时增加,所以我将如何将消费者与未来创建的队列关联起来。我已经为此创建了一个窗口服务,所以我是否必须循环所有队列并与消费者,对于将来创建的队列,我应该将它们添加到消费者队列列表中。
使用SpringBoot。
我创建了一个 TopicExchange,它接受消息并根据消息中存在的routingKey 将它们定向到两个队列。
消息通过以下方式发送:
rabbitTemplate.convertAndSend('in-out-topic', 'inbound.queue.route.key', payload)
Run Code Online (Sandbox Code Playgroud)
收到消息:
@RabbitListener(queues = "inbound-queue")
def onInboundMessage(def message) {
try {
log.debug("Received inbound message: ${message.messageId} on inbound queue listener", message)
} catch (Exception ex) {
log.error("Inbound message exception: ${ex.getMessage()}")
return;
}
return message.payload
}
Run Code Online (Sandbox Code Playgroud)
但是当我的侦听器(消费者)收到消息时,我收到以下异常:
org.springframework.amqp.AmqpException: Cannot determine ReplyTo message property value: Request message does not contain reply-to property, and no default response Exchange was set.
Run Code Online (Sandbox Code Playgroud)
我只是希望消息在被消息侦听器使用时从相应的队列中删除。
我对不同的消息代理感到困惑。
我的设备正在使用 MQTT。到目前为止,我已经研究过 HiveMQ、IBM Messagesight、RabbitMQ、google pub and sub、AWS SQS。
HiveMQ(MQTT消息代理)和RabbitMQ(或pub and sub,SQS)有什么区别?
除了协议和成本可能不同之外,它们的功能有什么区别吗?还有一个问题,IBM messagesight 是一种消息代理吗?它与 google pub and sub 或rabbit 有什么不同吗?
我找不到任何有关 messagesight 的信息。
我有在 Windows 机器上安装rabbitmq 的 c# 代码。跑完之后rabbitmq-service install
我rabbitmq-service start
就跑了rabbitmq-plugins enable rabbitmq_management
。最后我得到:
设置3个插件。离线更改;更改将在代理重启时生效。
我想知道是否可以避免这种重启。这会花费大量时间并减慢安装过程。我很确定在我的测试过程中至少有一次我不需要这样做,但我无法重现......
rabbitmq ×10
python ×2
apache-kafka ×1
apache-nifi ×1
celery ×1
celerybeat ×1
dockerhub ×1
hivemq ×1
installation ×1
mqtt ×1
pika ×1
plugins ×1
spring-amqp ×1
task ×1
windows ×1