我的 Spring AMQP 应用程序在启动时记录了以下异常:
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Failed to invoke target method 'receiveMessage' with argument type = [class [B], value = [{[B@660cff44}]
Run Code Online (Sandbox Code Playgroud)
从我的搜索中我了解到这是因为类与消息类型不兼容?但是,我看不到这是哪里。
以下是相关的代码段:
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
@Bean
Queue queue() {
return new Queue(config.getAMQPResultsQueue(), false);
}
@Bean
TopicExchange exchange() {
return new TopicExchange(config.getAMQPResultsExchange());
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("#");
}
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(config.getAMQPResultsQueue());
container.setMessageListener(listenerAdapter);
container.setMessageConverter(jsonMessageConverter());
return container;
}
@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) …Run Code Online (Sandbox Code Playgroud) 根据https://groups.google.com/forum/#!topic/rabbitmq-users/vvWAymjDww4,如果队列跨多个服务器(节点)镜像,发布者写入哪个队列并不重要 - RabbitMQ 将始终将消息转发到主节点(首次创建队列的节点)。
如果是这种情况,如果每条消息最终都会路由到同一个节点,那么将负载均衡器放在节点前面有什么意义?似乎主节点将始终承担整个负载。
我安装了 RabbitMQ,它运行良好。然后,一旦我的系统内存已满并且它停止工作。我清除了系统内存并再次启动了RabbitMQ,但它没有启动。以下是我得到的状态。
root@qa-development-vm:/usr/# systemctl status rabbitmq-server.service
? rabbitmq-server.service - RabbitMQ broker
Loaded: loaded (/lib/systemd/system/rabbitmq-server.service; enabled)
Active: activating (start) since Thu 2018-12-06 06:13:31 UTC; 641ms ago
Process: 32243 ExecStop=/bin/sh -c while ps -p $MAINPID >/dev/null 2>&1; do sleep 1; done (code=exited, status=0/SUCCESS)
Process: 32105 ExecStop=/usr/lib/rabbitmq/bin/rabbitmqctl stop (code=exited, status=0/SUCCESS)
Main PID: 32253 (beam.smp)
CGroup: /system.slice/rabbitmq-server.service
??32253 /usr/lib/erlang/erts-10.1.3/bin/beam.smp -W w -A 64 -P 1048576 -t 5000000 -stbt db -zdbbl 128000 -K true -- -root /usr/lib/erlang -progname erl -- -home ...
??32465 erl_child_setup 1024 …Run Code Online (Sandbox Code Playgroud) 我希望能够在无法连接到 rabbitmq 时重新启动 golang docker 文件,如下所述:(Docker Compose 在启动 Y 之前等待容器 X,请参阅:svenhornberg 的回答)。不幸的是,我的 golang 容器将退出但永远不会重新启动,我不知道为什么。
Docker-撰写:
version: '3.3'
services:
mongo:
image: 'mongo:3.4.1'
container_name: 'datastore'
ports:
- '27017:27017'
rabbitmq:
restart: always
tty: true
image: rabbitmq:3.7-management-alpine
hostname: "rabbit"
ports:
- "15672:15672"
- "5672:5672"
labels:
NAME: "rabbitmq"
volumes:
- ./rabbitmq-isolated.conf:/etc/rabbitmq/rabbitmq.config
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:15672"]
interval: 3s
timeout: 5s
retries: 20
api:
restart: always
tty: true
container_name: 'api'
build: '.'
working_dir: /go/src/github.com/patientplatypus/project
ports:
- '8000:8000'
volumes:
- './:/go/src/github.com/patientplatypus/project'
- './uploads:/uploads'
- …Run Code Online (Sandbox Code Playgroud) 我通过以下方式在运行 OS X 的本地机器上安装了 Rabbit MQ:
brew install rabbitmq
我通过以下方式启动和停止服务器:
brew services start/stop rabbitmq
我通过以下方式安装了 Web UI 插件:
rabbitmq-plugins enable rabbitmq_management
我希望在浏览器中查看 Web UI:
它不加载。报告的错误是 431,这意味着请求头字段太大。 https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/431
我正在尝试使用 amqp 库构建一个简单的 node.js 客户端,它打开一个连接,然后打开一个到 RabbitMQ 服务器的通道。我想重用相同的连接和通道来发送多条消息。主要问题是,我不想在 ceateChannel() 函数的回调函数中编写我的整个代码。
如何在回调函数之外重用通道并确保在使用通道之前回调函数已完成?
我已经尝试了回调方式和承诺方式,但我无法让它们中的任何一个工作。使用回调方法时,我遇到了所描述的问题。
使用 promise 时,我遇到的问题是我无法在 .then() 函数之外保留连接和通道的引用,因为在设置连接和通道后传递的变量会被破坏。
amqp.connect('amqp://localhost', (err, conn) => {
if (err !== null) return console.warn(err);
console.log('Created connection!');
conn.createChannel((err, ch) => {
if (err !== null) return console.warn(err);
console.log('Created channel!');
//this is where I would need to write the code that uses the variable "ch"
//but I want to move the code outside of this structure, while making sure
//this callback completes before I try using "ch"
});
}); …Run Code Online (Sandbox Code Playgroud) 我正在使用 Rabbitmq 进行一个项目,我正在使用 RPC 模式,基本上我正在接收或使用来自队列的消息,进行一些处理,然后发送回响应。我使用 Pika,我的目标是为每个任务使用一个线程,因此对于每个任务,我将专门为该任务创建一个线程。我还读到,最佳实践是只建立一个连接,并根据需要在其下建立多个通道,但我总是收到此错误:
“可能无法从 pika.exceptions.RecursionError 的范围内调用 start_sumption”:start_sumption 可能不会从另一个 BlockingConnection 或 BlockingChannel 回调的范围内调用。
我做了一些研究,发现 Pika 不是线程安全的,我们应该为每个线程使用一个独立的连接和一个通道。但我不想这样做,因为它被认为是不好的做法。所以我想在这里问一下是否有人已经实现了这项工作。我还读到,如果我不使用 BlockingConnection 来实例化我的 Connection 并且有一个名为 add_callback_threadsafe 的函数可以使这成为可能。但不幸的是没有示例,我阅读了文档,但它很复杂,如果没有示例,我很难理解他们想要描述的内容。
我的尝试是声明两个类。每个类将代表一个任务执行器,它接收或使用来自队列的消息,并基于此进行一些处理并返回响应。我的想法是在两个任务之间共享一个rabbitmq 连接,但每个任务都会获得一个独立的通道。在上面的代码中,传递给函数的 rabbit 参数是一个类,其中包含一些变量,如 Connection 和其他函数,如 EventSubscriber,当调用它时,它将分配一个新通道并开始使用来自该特定交换和路由密钥的消息。接下来我声明一个线程并将订阅或消费函数作为该线程的目标。另一个任务类看起来也和这个类一样,这就是为什么我只上传这个代码。
On_Deregistration 类:
def __init__(self, rabbit):
self.event(rabbit) # this will call event function and pass the connection shared between all Tasks. rabbit parameter hold a connection to rabbitmq
def event(self, rabbit):
self.Subscriber = rabbit.EventSubscriber(rabbit, 'testing.test', 'test', False, onDeregistrationFromHRS # this func is task listener)
def subscribeAsync(self):
self.Subscriber.subscribe() # here i …Run Code Online (Sandbox Code Playgroud) 我尝试使用我上面写的技术运行简单的测试项目,但遇到奇怪的异常并且找不到解决方案。我使用 Windows 7 并且 docker 在主机上工作:192.168.99.100。rabbitmq 的管理面板通过地址http://192.168.99.100:15672,运行良好。
应用特性:
spring.rabbitmq.host=192.168.99.100
spring.rabbitmq.port=15672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
jsa.rabbitmq.queue=jsa.queue
Run Code Online (Sandbox Code Playgroud)
成分:
@Component
public class Consumer {
@RabbitListener(queues="${jsa.rabbitmq.queue}")
public void recievedMessage(String msg) {
System.out.println("Recieved Message: " + msg);
}
}
Run Code Online (Sandbox Code Playgroud)
堆栈跟踪:
2019-09-14 16:00:54.677 ERROR 4532 --- [68.99.100:15672] c.r.c.impl.ForgivingExceptionHandler : An unexpected connection driver error occured
java.net.SocketException: Socket Closed
at java.net.SocketInputStream.socketRead0(Native Method) ~[na:1.8.0_201]
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) ~[na:1.8.0_201]
at java.net.SocketInputStream.read(SocketInputStream.java:171) ~[na:1.8.0_201]
at java.net.SocketInputStream.read(SocketInputStream.java:141) ~[na:1.8.0_201]
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) ~[na:1.8.0_201]
at java.io.BufferedInputStream.read(BufferedInputStream.java:265) ~[na:1.8.0_201]
at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288) ~[na:1.8.0_201]
at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:91) ~[amqp-client-4.0.2.jar:4.0.2]
at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:164) ~[amqp-client-4.0.2.jar:4.0.2] …Run Code Online (Sandbox Code Playgroud) 我是 LDAP 新手。
到目前为止,我已经成功地使用 LDAP 配置了 rabbitMQ 并对其进行了身份验证,如果它是针对单个 AD 的。我正在使用以下配置:
RabbitMQ 配置文件,
auth_backends,[{rabbit_auth_backend_ldap, rabbit_auth_backend_internal},rabbit_auth_backend_internal]
Run Code Online (Sandbox Code Playgroud)
在 RabbitMQ 管理中,我手动创建了一个没有设置密码的用户名(它有效)
但是,让我们说
我有一个 AD 组(称为“Rabbit 用户组”),里面有 3 个用户(User1、User2、User3)。
“Rabbit User Group”的位置在:sample.companyname.com > City Name (OU) > Groups (OU) > IT Groups (OU) > “Rabbit User Group”(安全组)。
我可以知道我应该如何在 RabbitMQ 管理和配置文件中配置它,以便一旦我更新特定组,组内的所有成员都将进行身份验证并具有相同的权限(例如,只有该组具有管理员权限) )?而且我不需要在rabbitMQ管理中手动创建每个单独的用户进行身份验证?。
我已将以下内容添加到我的 rabbitmq 配置文件中
{
tag_queries, [
{administrator,{in_group,'CN="Rabbit User Group",OU="City Name", OU=Groups, OU="IT Group",DC=sample,DC=companyname,DC=com',"uniqueMember"}},
{management, {constant, true}}
]
}
Run Code Online (Sandbox Code Playgroud)
并尝试在没有密码的情况下在rabbitMQ管理中创建一个名为“Rabbit User Group”的用户名。但是当我尝试以“User1”身份登录时,却无法登录。
我尝试了几种方法,但我不确定我遗漏了哪一部分,而且它们似乎都不起作用。
这是我的整体配置文件。
[
{
rabbit,
[
{
auth_backends,[{rabbit_auth_backend_ldap, rabbit_auth_backend_internal},rabbit_auth_backend_internal]
}
]
},
{ …Run Code Online (Sandbox Code Playgroud) rabbitmq ×10
docker ×2
.net-core ×1
amqp ×1
c# ×1
erlang ×1
go ×1
java ×1
javascript ×1
ldap ×1
macos ×1
macos-mojave ×1
masstransit ×1
node.js ×1
pika ×1
promise ×1
python ×1
restart ×1
spring ×1
spring-amqp ×1
spring-boot ×1