标签: spring-rabbit

如何用spring-rabbit配置RabbitMQ连接?

我正在按照本指南学习如何使用spring-rabbitRabbitMQ.但是在本指南中,RabbitMQ配置是默认配置(localhost服务器,凭证为guest/guest).如果我想用ip地址和凭证连接到远程RabbitMQ,我该怎么办?我不知道在我的应用程序中将这些信息设置在何处.

rabbitmq spring-rabbit spring-amqp spring-boot spring-rabbitmq

19
推荐指数
1
解决办法
2万
查看次数

如何根据条件限制并发消息使用

场景(我简化了一些事情):

  • 许多最终用户可以从前端Web应用程序(生产者)开始工作(繁重的工作,例如渲染大型PDF).
  • 作业将发送到单个持久的RabbitMQ队列.
  • 许多工作者应用程序(使用者)处理这些作业并将结果写回数据存储区.

这个相当标准的模式工作正常.

问题是:如果用户在同一分钟内启动了10个作业,并且在一天中的那个时间只有10个工作者应用程序启动,则该最终用户有效地接管了他自己的所有计算时间.

问题:如何确保每个最终用户只能处理一个作业?(奖励:一些最终用户(例如管理员)不得受到限制)

此外,我不希望前端应用程序阻止最终用户启动并发作业.我只希望最终用户等待他们的并发作业一次完成一个.

解决方案?:我应该为每个最终用户动态创建一个自动删除独占队列吗?如果是,我如何告诉worker应用程序开始使用此队列?如何确保一个(并且只有一个)工作者将从此队列中消耗?

rabbitmq spring-rabbit

15
推荐指数
2
解决办法
2919
查看次数

Spring RabbitMQ - 在具有@RabbitListener配置的服务上使用手动通道确认

如何在不使用自动确认的情况下手动确认消息.是否有与一起使用的方式@RabbitListener@EnableRabbit风格的配置.大多数文档告诉我们SimpleMessageListenerContainer一起使用ChannelAwareMessageListener.但是使用它会失去注释提供的灵活性.我已经配置了我的服务如下:

@Service
public class EventReceiver {

@Autowired
private MessageSender messageSender;

@RabbitListener(queues = "${eventqueue}")
public void receiveMessage(Order order) throws Exception {

  // code for processing order
}
Run Code Online (Sandbox Code Playgroud)

我的RabbitConfiguration如下

@EnableRabbit
public class RabbitApplication implements RabbitListenerConfigurer {

public static void main(String[] args) {
    SpringApplication.run(RabbitApplication.class, args);
}

@Bean


public MappingJackson2MessageConverter jackson2Converter() {
        MappingJackson2MessageConverter converter = new  MappingJackson2MessageConverter();
        return converter;
    @Bean
public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
      SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
      factory.setConnectionFactory(rabbitConnectionFactory());
      factory.setMaxConcurrentConsumers(5);
      factory.setMessageConverter((MessageConverter) jackson2Converter());
      factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
      return factory; …
Run Code Online (Sandbox Code Playgroud)

java rabbitmq spring-rabbit spring-amqp spring-rabbitmq

14
推荐指数
2
解决办法
2万
查看次数

从spring application.properties禁用侦听rabbit队列

我想在spring中创建一个application-development.properties文件来定义开发环境.在这种环境下想要禁用监听兔子队列,因为我不想在调试等时干扰登台队列.

问题是 - 我找不到控制它的属性.没有"有效"属性或"已启用"属性或任何内容..

这些是我在Spring文档中找到的属性:

# RABBIT (RabbitProperties)
spring.rabbitmq.addresses= # connection addresses (e.g. myhost:9999,otherhost:1111)
spring.rabbitmq.dynamic=true # create an AmqpAdmin bean
spring.rabbitmq.host= # connection host
spring.rabbitmq.port= # connection port
spring.rabbitmq.password= # login password
spring.rabbitmq.requested-heartbeat= # requested heartbeat timeout, in seconds; zero for none
spring.rabbitmq.listener.acknowledge-mode= # acknowledge mode of container
spring.rabbitmq.listener.concurrency= # minimum number of consumers
spring.rabbitmq.listener.max-concurrency= # maximum number of consumers
spring.rabbitmq.listener.prefetch= # number of messages to be handled in a single request
spring.rabbitmq.listener.transaction-size= # number of messages to …
Run Code Online (Sandbox Code Playgroud)

java spring spring-rabbit spring-boot

13
推荐指数
2
解决办法
6761
查看次数

以bean的形式动态添加新队列,绑定和交换

我目前正在开发一个rabbit-amqp实现项目,并使用spring-rabbit以编程方式设置我的所有队列,绑定和交换.(spring-rabbit-1.3.4和spring-framework版本3.2.0)

在我看来,javaconfiguration类或基于xml的配置中的声明都是非常静态的.我知道如何为这样的队列,交换或绑定设置更动态的值(例如名称):

@Configuration
public class serverConfiguration {
   private String queueName;
   ...
   @Bean
   public Queue buildQueue() {
    Queue queue = new Queue(this.queueName, false, false, true, getQueueArguments());
    buildRabbitAdmin().declareQueue(queue);
    return queue;
   }
   ...
}
Run Code Online (Sandbox Code Playgroud)

但我想知道是否有可能创建一个未定义的Queue数量实例并将它们注册为bean,就像注册其所有实例的工厂一样.

我并不熟悉Spring @Bean注释及其局限性,但我尝试过

@Configuration
public class serverConfiguration {
   private String queueName;
   ...
   @Bean
   @Scope("prototype")
   public Queue buildQueue() {
    Queue queue = new Queue(this.queueName, false, false, true, getQueueArguments());
    buildRabbitAdmin().declareQueue(queue);
    return queue;
   }
   ...
}
Run Code Online (Sandbox Code Playgroud)

并查看是否已注册Queue的多个Bean实例,我调用:

Map<String, Queue> queueBeans = ((ListableBeanFactory) applicationContext).getBeansOfType(Queue.class);
Run Code Online (Sandbox Code Playgroud)

但这只会返回1个映射:

name of the method := the last created …
Run Code Online (Sandbox Code Playgroud)

spring rabbitmq spring-rabbit spring-amqp spring-bean

11
推荐指数
1
解决办法
5517
查看次数

Spring RabbitTemplate - 如何在发送时自动创建队列

我将 RabbitMQ 与 Spring 的 RabbitTemplate 一起使用。

使用模板发送方法向队列发送消息时,如果队列不存在,我希望自动创建/声明队列。

这很重要,因为根据我们的业务逻辑队列名称是在运行时生成的,我不能提前声明它们。

以前我们使用过 JmsTemplate 并且任何发送或接收的调用都会自动创建队列。

java spring rabbitmq spring-rabbit spring-amqp

11
推荐指数
2
解决办法
8205
查看次数

如何在@rabbitlistener中使用@queuebinding?

似乎自spring-amqp 1.5版以来,有一个新的注释@ queuebinding.But如何使用它,我不知道它是否可以用于类或方法?它是否存在任何例子?

rabbitmq spring-rabbit spring-amqp

10
推荐指数
1
解决办法
9510
查看次数

如何在春季启动测试中模拟spring amqp/rabbit

如何模拟spring rabbitmq/amqp,以便在尝试自动创建交换/队列时在Spring Boot Test期间不会失败?

鉴于我有一个简单的RabbitListener,将导致队列和交换自动创建如下:

@Component
@RabbitListener(bindings = {
        @QueueBinding(
                value = @Queue(value = "myqueue", autoDelete = "true"), 
                exchange = @Exchange(value = "myexchange", autoDelete = "true", type = "direct"), 
                key = "mykey")}
)
@RabbitListenerCondition
public class EventHandler {
    @RabbitHandler
    public void onEvent(Event event) {
      ...
    }   
}
Run Code Online (Sandbox Code Playgroud)

在简单的Spring Boot Test中,如下所示:

@ActiveProfiles("test")
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, classes = { Application.class })

    @Autowired
    private ApplicationContext applicationContext;

    @Test
    public void test() {
        assertNotNull(applicationContext);
    }

}
Run Code Online (Sandbox Code Playgroud)

它将失败:

16:22:16.527 [SimpleAsyncTaskExecutor-1] ERROR o.s.a.r.l.SimpleMessageListenerContainer - …
Run Code Online (Sandbox Code Playgroud)

spring spring-test spring-rabbit spring-amqp spring-rabbitmq

9
推荐指数
5
解决办法
2万
查看次数

如何在Spring AMQP中使用Ack或Nack

我是Spring AMQP的新手.我有一个应用程序,它是一个生产者向另一个消费者的应用程序发送消息.

消费者收到消息后,我们将对数据进行验证.

如果数据正确,我们必须确认并且应该从队列中删除消息.如果数据不正确,我们必须NACK(否定确认)数据,以便它将在RabbitMQ中重新排队.

我碰到

**factory.setDefaultRequeueRejected(false);**(它根本不会重新排列消息)

**factory.setDefaultRequeueRejected(true);**(它会在发生异常时重新排列消息)

但我的情况是,我将基于验证确认该消息.然后它应该删除该消息.如果NACK然后重新排列该消息.

我在RabbitMQ网站上看过

AMQP规范定义了basic.reject方法,该方法允许客户拒绝单个传递的消息,指示代理丢弃它们或重新排队它们

如何实现上述场景?请给我一些例子.

我尝试了一个小程序

       logger.info("Job Queue Handler::::::::::" + new Date());
        try {

        }catch(Exception e){

            logger.info("Activity Object Not Found Exception so message should be Re-queued the Message::::::::::::::");

        }

        factory.setErrorHandler(new ConditionalRejectingErrorHandler(cause ->{
            return cause instanceof XMLException;
        }));
Run Code Online (Sandbox Code Playgroud)

消息不是为不同的异常factory.setDefaultRequeueRejected(true)重新排队

09:46:38,854 ERROR [stderr](SimpleAsyncTaskExecutor-1) org.activiti.engine.ActivitiObjectNotFoundException:没有使用键'WF89012'部署的进程

09:46:39,102 INFO [com.example.bip.rabbitmq.handler.ErrorQueueHandler](SimpleAsyncTaskExecutor-1)从错误队列收到:{ERROR =无法提交JPA事务; 嵌套异常是 javax.persistence.RollbackException:标记为rollbackOnly的事务 }

rabbitmq spring-rabbit spring-amqp spring-rabbitmq

9
推荐指数
1
解决办法
1万
查看次数

使用兔子监听器的Spring amqp转换器问题

我想我在这里遗漏了一些东西......我正在尝试创建简单的兔子列表器,它可以接受自定义对象作为消息类型.现在按照医生的说法

在1.6之前的版本中,转换JSON的类型信息必须在消息头中提供,或者需要自定义ClassMapper.从版本1.6开始,如果没有类型信息头,则可以从目标方法参数推断出类型.

我在仪表板中使用rabbit mq adm手动将消息放入队列,得到错误

Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [com.example.Customer] for GenericMessage [payload=byte[21], headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=customer, amqp_deliveryTag=1, amqp_consumerQueue=customer, amqp_redelivered=false, id=81e8a562-71aa-b430-df03-f60e6a37c5dc, amqp_consumerTag=amq.ctag-LQARUDrR6sUcn7FqAKKVDA, timestamp=1485635555742}]
Run Code Online (Sandbox Code Playgroud)

我的配置:

@Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new    CachingConnectionFactory("localhost");
        connectionFactory.setUsername("test");
        connectionFactory.setPassword("test1234");
        connectionFactory.setVirtualHost("/");
        return connectionFactory;
    }

    @Bean
    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplate;
    }

    @Bean
    public AmqpAdmin amqpAdmin() {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
        return rabbitAdmin;
    }

    @Bean
    public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
Run Code Online (Sandbox Code Playgroud)

还有问题是这个异常消息没有放回队列中. …

rabbitmq spring-rabbit spring-amqp spring-rabbitmq

9
推荐指数
1
解决办法
7946
查看次数