我有以下配置
spring.rabbitmq.listener.prefetch=1
spring.rabbitmq.listener.concurrency=1
spring.rabbitmq.listener.retry.enabled=true
spring.rabbitmq.listener.retry.max-attempts=3
spring.rabbitmq.listener.retry.max-interval=1000
spring.rabbitmq.listener.default-requeue-rejected=false //I have also changed it to true but the same behavior still happens
Run Code Online (Sandbox Code Playgroud)
在我的监听器中,我抛出异常AmqpRejectAndDontRequeueException 来拒绝消息并强制兔子不要尝试重新传递它......但是兔子重新传递它3 次,然后最终将它路由到死信队列。
这是根据我提供的配置的标准行为还是我错过了什么?
我有以下两种情况
我想在同一个侦听器容器工厂和同一个队列上支持这两种情况。
我已经有以下配置来成功支持案例 1:
@Bean
public RetryOperationsInterceptor workMessagesRetryInterceptor() {
return RetryInterceptorBuilder.stateless()
.maxAttempts(5)
.backOffOptions(1000, 2, 10000)
.recoverer(new RejectAndDontRequeueRecoverer())
.build();
}
@Bean
public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMaxConcurrentConsumers(8);
factory.setAdviceChain(workMessagesRetryInterceptor());
return factory;
}`
Run Code Online (Sandbox Code Playgroud)
现在我想扩展之前的配置来支持情况 2。
编辑,感谢加里的快速回复。
这是我的新配置,但我仍然对两个异常进行重试: ListenerExecutionFailedException 、 AmqpRejectAndDontRequeueException
@Bean
public SimpleRetryPolicy rejectionRetryPolicy(){
Map<Class<? extends Throwable> , Boolean> exceptionsMap = new HashMap<Class<? extends Throwable> , Boolean>();
exceptionsMap.put(ListenerExecutionFailedException.class, true); //retriable
exceptionsMap.put(AmqpRejectAndDontRequeueException.class, false);//not retriable
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(5 , …Run Code Online (Sandbox Code Playgroud) 我的测试用例中有以下弹性搜索容器配置
@Container
public static GenericContainer container = new GenericContainer<>("elasticsearch:7.7.0")
.withExposedPorts(9200,9300).withEnv("discovery.type","single-node")
.withNetwork(Network.newNetwork())
.withNetworkAliases("someNetwork");
Run Code Online (Sandbox Code Playgroud)
在@BeforeAll 注释方法中,我像这样弹性搜索 url 属性
System.setProperty("spring.data.elasticsearch.cluster-nodes", container.getContainerIpAddress() + ":" + container.getMappedPort(9300));
Run Code Online (Sandbox Code Playgroud)
当我检查正在运行的容器时(在测试用例调试暂停期间),从 power shell 中,我在端口列下发现了类似的内容:0.0.0.0 : 32844->9200 / tcp, 0.0.0.0:32843->9300/tcp 当我打印container.getContainerIpAddress() + ":" + container.getMappedPort(9300),我在容器端口列中得到了映射到 9300 的相同端口,在本例中为localhost:32843,确保端口是随机的并且在每个新运行。
当代码`conf = repo.save(conf); 运行,我得到以下异常:
引起:org.apache.http.ProtocolException:无效的协议版本:这不是 HTTP 端口 在 org.apache.http.impl.nio.codecs.AbstractMessageParser.parse(AbstractMessageParser.java:209) 在 org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:245) 在 org.apache.http .impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81) 在 org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39) 在 org.apache.http.impl.nio .reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114) 在 org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162) 在 org.apache.http.impl.nio.reactor.AbstractIOReactor .processEvent(AbstractIOReactor.java:337) 在 org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315) 在 org.apache.http.impl.nio.reactor。AbstractIOReactor.execute(AbstractIOReactor.java:276) at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker。 run(AbstractMultiworkerIOReactor.java:591) at java.base/java.lang.Thread.run(Thread.java:834) …