小编Sha*_*gab的帖子

Spring rabbit 重试传递被拒绝的消息..可以吗?

我有以下配置

spring.rabbitmq.listener.prefetch=1
spring.rabbitmq.listener.concurrency=1
spring.rabbitmq.listener.retry.enabled=true
spring.rabbitmq.listener.retry.max-attempts=3
spring.rabbitmq.listener.retry.max-interval=1000
spring.rabbitmq.listener.default-requeue-rejected=false //I have also changed it to true but the same behavior still happens
Run Code Online (Sandbox Code Playgroud)

在我的监听器中,我抛出异常AmqpRejectAndDontRequeueException 来拒绝消息并强制兔子不要尝试重新传递它......但是兔子重新传递它3 次,然后最终将它路由到死信队列。

这是根据我提供的配置的标准行为还是我错过了什么?

rabbitmq spring-rabbit spring-amqp spring-boot

3
推荐指数
1
解决办法
6879
查看次数

spring amqp 通过配置启用重试,并根据指定的异常阻止重试

我有以下两种情况

  1. 如果出现ExceptionA:重试有限次数,最后当重试次数耗尽时,消息将写入死信队列
  2. 在ExceptionB的情况下:简单来说,消息应该写入死信队列

我想在同一个侦听器容器工厂和同一个队列上支持这两种情况。

我已经有以下配置来成功支持案例 1

@Bean
public RetryOperationsInterceptor workMessagesRetryInterceptor() {
        return RetryInterceptorBuilder.stateless()
                .maxAttempts(5)
                .backOffOptions(1000, 2, 10000)
                .recoverer(new RejectAndDontRequeueRecoverer())
                .build();
    }




@Bean
public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
  SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  factory.setConnectionFactory(connectionFactory);
  factory.setMaxConcurrentConsumers(8);
  factory.setAdviceChain(workMessagesRetryInterceptor());


  return factory;
}`
Run Code Online (Sandbox Code Playgroud)

现在我想扩展之前的配置来支持情况 2。


编辑,感谢加里的快速回复。

这是我的新配置,但我仍然对两个异常进行重试: ListenerExecutionFailedException 、 AmqpRejectAndDontRequeueException

@Bean
    public SimpleRetryPolicy rejectionRetryPolicy(){

        Map<Class<? extends Throwable> , Boolean> exceptionsMap = new HashMap<Class<? extends Throwable> , Boolean>();
        exceptionsMap.put(ListenerExecutionFailedException.class, true); //retriable
        exceptionsMap.put(AmqpRejectAndDontRequeueException.class, false);//not retriable


        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(5 , …
Run Code Online (Sandbox Code Playgroud)

spring-amqp spring-retry

1
推荐指数
1
解决办法
3356
查看次数

spring 数据弹性搜索 不是有效的协议版本:这不是 HTTP 端口

我的测试用例中有以下弹性搜索容器配置

    @Container
    public static  GenericContainer container = new GenericContainer<>("elasticsearch:7.7.0")
        .withExposedPorts(9200,9300).withEnv("discovery.type","single-node")
        .withNetwork(Network.newNetwork())
        .withNetworkAliases("someNetwork");
Run Code Online (Sandbox Code Playgroud)

在@BeforeAll 注释方法中,我像这样弹性搜索 url 属性

System.setProperty("spring.data.elasticsearch.cluster-nodes", container.getContainerIpAddress() + ":" + container.getMappedPort(9300));
Run Code Online (Sandbox Code Playgroud)

当我检查正在运行的容器时(在测试用例调试暂停期间),从 power shell 中,我在端口列下发现了类似的内容:0.0.0.0 : 32844->9200 / tcp, 0.0.0.0:32843->9300/tcp 当我打印container.getContainerIpAddress() + ":" + container.getMappedPort(9300),我在容器端口列中得到了映射到 9300 的相同端口,在本例中为localhost:32843,确保端口是随机的并且在每个新运行。

当代码`conf = repo.save(conf); 运行,我得到以下异常:

引起:org.apache.http.ProtocolException:无效的协议版本:这不是 HTTP 端口 在 org.apache.http.impl.nio.codecs.AbstractMessageParser.parse(AbstractMessageParser.java:209) 在 org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:245) 在 org.apache.http .impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81) 在 org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39) 在 org.apache.http.impl.nio .reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114) 在 org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162) 在 org.apache.http.impl.nio.reactor.AbstractIOReactor .processEvent(AbstractIOReactor.java:337) 在 org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315) 在 org.apache.http.impl.nio.reactor。AbstractIOReactor.execute(AbstractIOReactor.java:276) at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker。 run(AbstractMultiworkerIOReactor.java:591) at java.base/java.lang.Thread.run(Thread.java:834) …

spring-data-elasticsearch testcontainers

0
推荐指数
1
解决办法
1751
查看次数