标签: spring-rabbitmq

如何用spring-rabbit配置RabbitMQ连接?

我正在按照本指南学习如何使用spring-rabbitRabbitMQ.但是在本指南中,RabbitMQ配置是默认配置(localhost服务器,凭证为guest/guest).如果我想用ip地址和凭证连接到远程RabbitMQ,我该怎么办?我不知道在我的应用程序中将这些信息设置在何处.

rabbitmq spring-rabbit spring-amqp spring-boot spring-rabbitmq

19
推荐指数
1
解决办法
2万
查看次数

Spring RabbitMQ - 在具有@RabbitListener配置的服务上使用手动通道确认

如何在不使用自动确认的情况下手动确认消息.是否有与一起使用的方式@RabbitListener@EnableRabbit风格的配置.大多数文档告诉我们SimpleMessageListenerContainer一起使用ChannelAwareMessageListener.但是使用它会失去注释提供的灵活性.我已经配置了我的服务如下:

@Service
public class EventReceiver {

@Autowired
private MessageSender messageSender;

@RabbitListener(queues = "${eventqueue}")
public void receiveMessage(Order order) throws Exception {

  // code for processing order
}
Run Code Online (Sandbox Code Playgroud)

我的RabbitConfiguration如下

@EnableRabbit
public class RabbitApplication implements RabbitListenerConfigurer {

public static void main(String[] args) {
    SpringApplication.run(RabbitApplication.class, args);
}

@Bean


public MappingJackson2MessageConverter jackson2Converter() {
        MappingJackson2MessageConverter converter = new  MappingJackson2MessageConverter();
        return converter;
    @Bean
public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
      SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
      factory.setConnectionFactory(rabbitConnectionFactory());
      factory.setMaxConcurrentConsumers(5);
      factory.setMessageConverter((MessageConverter) jackson2Converter());
      factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
      return factory; …
Run Code Online (Sandbox Code Playgroud)

java rabbitmq spring-rabbit spring-amqp spring-rabbitmq

14
推荐指数
2
解决办法
2万
查看次数

无法连接Spring AMQP/Rabbit MQ:org.springframework.amqp.AmqpConnectException:java.net.ConnectException:连接被拒绝:连接

我是Spring AMQP/Rabbit MQ的新手.

我在我的项目中使用Spring AMQP/Rabbit MQ.运行tomcat后我遇到以下错误:

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - 消费者引发异常,如果连接工厂支持,则处理可以重新启动.

异常摘要:org.springframework.amqp.AmqpConnectException:java.net.ConnectException:连接被拒绝:连接

以下是配置文件:

弹簧amqp.xml

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit        http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

    <rabbit:connection-factory id="connectionFactory" host="127.0.0.1"/>
    <rabbit:admin connection-factory="connectionFactory"/>

    <rabbit:template connection-factory="connectionFactory" id="rabbitTemplate" channel-transacted="true"/>
    <rabbit:queue name="proposalQueue" />

    <rabbit:listener-container connection-factory="connectionFactory">
        <rabbit:listener ref="listener" queue-names="proposalQueue"/>
    </rabbit:listener-container>

    <bean id="rabbitMQTransactionManager" class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
        <property name="connectionFactory" ref="connectionFactory"/>
    </bean>

    <rabbit:direct-exchange name="myExchange">
        <rabbit:bindings>
             <rabbit:binding queue="proposalQueue" key="userMesssage" />
        </rabbit:bindings>
    </rabbit:direct-exchange>
    <bean id="listener" class="com.xxx.xxxx.rabbitmq.QueueServer"/>
 </beans>
Run Code Online (Sandbox Code Playgroud)

QueueServer.java

@Override
    public void onMessage(Message message) {

    Map<String, Object> result = new HashMap<>();       
    MessageProperties props = message.getMessageProperties();
    BasicProperties replyProps = new BasicProperties.Builder().correlationId(new …
Run Code Online (Sandbox Code Playgroud)

rabbitmq spring-amqp spring-rabbitmq

13
推荐指数
2
解决办法
3万
查看次数

多个带弹簧启动的Rabbitmq队列

从春季启动教程:https: //spring.io/guides/gs/messaging-rabbitmq/

他们给出了仅创建1个队列和1个队列的示例,但是,如果我希望能够创建多于1个队列,该怎么办?怎么可能?

显然,我不能只创建两次相同的bean:

@Bean
Queue queue() {
    return new Queue(queueNameAAA, false);
}

@Bean
Queue queue() {
    return new Queue(queueNameBBB, false);
}
Run Code Online (Sandbox Code Playgroud)

你不能两次创建相同的bean,它会使模糊不清.

rabbitmq spring-amqp spring-boot rabbitmq-exchange spring-rabbitmq

11
推荐指数
1
解决办法
6169
查看次数

如何在春季启动测试中模拟spring amqp/rabbit

如何模拟spring rabbitmq/amqp,以便在尝试自动创建交换/队列时在Spring Boot Test期间不会失败?

鉴于我有一个简单的RabbitListener,将导致队列和交换自动创建如下:

@Component
@RabbitListener(bindings = {
        @QueueBinding(
                value = @Queue(value = "myqueue", autoDelete = "true"), 
                exchange = @Exchange(value = "myexchange", autoDelete = "true", type = "direct"), 
                key = "mykey")}
)
@RabbitListenerCondition
public class EventHandler {
    @RabbitHandler
    public void onEvent(Event event) {
      ...
    }   
}
Run Code Online (Sandbox Code Playgroud)

在简单的Spring Boot Test中,如下所示:

@ActiveProfiles("test")
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, classes = { Application.class })

    @Autowired
    private ApplicationContext applicationContext;

    @Test
    public void test() {
        assertNotNull(applicationContext);
    }

}
Run Code Online (Sandbox Code Playgroud)

它将失败:

16:22:16.527 [SimpleAsyncTaskExecutor-1] ERROR o.s.a.r.l.SimpleMessageListenerContainer - …
Run Code Online (Sandbox Code Playgroud)

spring spring-test spring-rabbit spring-amqp spring-rabbitmq

9
推荐指数
5
解决办法
2万
查看次数

如何在Spring AMQP中使用Ack或Nack

我是Spring AMQP的新手.我有一个应用程序,它是一个生产者向另一个消费者的应用程序发送消息.

消费者收到消息后,我们将对数据进行验证.

如果数据正确,我们必须确认并且应该从队列中删除消息.如果数据不正确,我们必须NACK(否定确认)数据,以便它将在RabbitMQ中重新排队.

我碰到

**factory.setDefaultRequeueRejected(false);**(它根本不会重新排列消息)

**factory.setDefaultRequeueRejected(true);**(它会在发生异常时重新排列消息)

但我的情况是,我将基于验证确认该消息.然后它应该删除该消息.如果NACK然后重新排列该消息.

我在RabbitMQ网站上看过

AMQP规范定义了basic.reject方法,该方法允许客户拒绝单个传递的消息,指示代理丢弃它们或重新排队它们

如何实现上述场景?请给我一些例子.

我尝试了一个小程序

       logger.info("Job Queue Handler::::::::::" + new Date());
        try {

        }catch(Exception e){

            logger.info("Activity Object Not Found Exception so message should be Re-queued the Message::::::::::::::");

        }

        factory.setErrorHandler(new ConditionalRejectingErrorHandler(cause ->{
            return cause instanceof XMLException;
        }));
Run Code Online (Sandbox Code Playgroud)

消息不是为不同的异常factory.setDefaultRequeueRejected(true)重新排队

09:46:38,854 ERROR [stderr](SimpleAsyncTaskExecutor-1) org.activiti.engine.ActivitiObjectNotFoundException:没有使用键'WF89012'部署的进程

09:46:39,102 INFO [com.example.bip.rabbitmq.handler.ErrorQueueHandler](SimpleAsyncTaskExecutor-1)从错误队列收到:{ERROR =无法提交JPA事务; 嵌套异常是 javax.persistence.RollbackException:标记为rollbackOnly的事务 }

rabbitmq spring-rabbit spring-amqp spring-rabbitmq

9
推荐指数
1
解决办法
1万
查看次数

使用兔子监听器的Spring amqp转换器问题

我想我在这里遗漏了一些东西......我正在尝试创建简单的兔子列表器,它可以接受自定义对象作为消息类型.现在按照医生的说法

在1.6之前的版本中,转换JSON的类型信息必须在消息头中提供,或者需要自定义ClassMapper.从版本1.6开始,如果没有类型信息头,则可以从目标方法参数推断出类型.

我在仪表板中使用rabbit mq adm手动将消息放入队列,得到错误

Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [com.example.Customer] for GenericMessage [payload=byte[21], headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=customer, amqp_deliveryTag=1, amqp_consumerQueue=customer, amqp_redelivered=false, id=81e8a562-71aa-b430-df03-f60e6a37c5dc, amqp_consumerTag=amq.ctag-LQARUDrR6sUcn7FqAKKVDA, timestamp=1485635555742}]
Run Code Online (Sandbox Code Playgroud)

我的配置:

@Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new    CachingConnectionFactory("localhost");
        connectionFactory.setUsername("test");
        connectionFactory.setPassword("test1234");
        connectionFactory.setVirtualHost("/");
        return connectionFactory;
    }

    @Bean
    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplate;
    }

    @Bean
    public AmqpAdmin amqpAdmin() {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
        return rabbitAdmin;
    }

    @Bean
    public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
Run Code Online (Sandbox Code Playgroud)

还有问题是这个异常消息没有放回队列中. …

rabbitmq spring-rabbit spring-amqp spring-rabbitmq

9
推荐指数
1
解决办法
7946
查看次数

带有弹簧靴的rabbitmq中的例外情况

当我用rabbitmq启动spring boot应用程序时,我反复得到以下异常.即使有以下例外情况,整个流程也能正常运行.是通过手段自动删除吗?

    08 Jul 2015 16:20:17,652 [ERROR] [SimpleAsyncTaskExecutor-2] SimpleMessageListenerContainer| Failed to check/redeclare auto-delete queue(s).
    java.util.concurrent.TimeoutException
        at com.rabbitmq.utility.BlockingCell.get(BlockingCell.java:77)
        at com.rabbitmq.utility.BlockingCell.uninterruptibleGet(BlockingCell.java:111)
        at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:37)
        at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:367)
        at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:293)
        at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:621)
        at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:665)
        at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:208)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:444)
        at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils$1.createConnection(ConnectionFactoryUtils.java:80)
        at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.doGetTransactionalResourceHolder(ConnectionFactoryUtils.java:130)
        at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactoryUtils.java:67)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1035)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1028)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1004)
        at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:254)
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.redeclareElementsIfNecessary(SimpleMessageListenerContainer.java:963)
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$300(SimpleMessageListenerContainer.java:83)
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1081)
        at java.lang.Thread.run(Thread.java:745)

08 Jul 2015 16:52:47,148 [WARN ] [SimpleAsyncTaskExecutor-1] SimpleMessageListenerContainer| Consumer raised exception, processing can restart if the connection factory supports it
java.util.concurrent.TimeoutException
    at com.rabbitmq.utility.BlockingCell.get(BlockingCell.java:77)
    at com.rabbitmq.utility.BlockingCell.uninterruptibleGet(BlockingCell.java:111) …
Run Code Online (Sandbox Code Playgroud)

java spring rabbitmq spring-rabbit spring-rabbitmq

8
推荐指数
1
解决办法
2万
查看次数

Spring AMQP-发送者和接收消息

我在从RabbitMQ接收消息时遇到问题。我正在发送如下消息

        HashMap<Object, Object> senderMap=new HashMap<>();
        senderMap.put("STATUS", "SUCCESS");
        senderMap.put("EXECUTION_START_TIME", new Date());

        rabbitTemplate.convertAndSend(Constants.ADAPTOR_OP_QUEUE,senderMap);
Run Code Online (Sandbox Code Playgroud)

如果在RabbitMQ中看到,我们将获得完全合格的类型。

在当前情况下,同一消费者有n个生产者。如果我使用任何映射器,则会导致异常。我将如何发送消息,使其不包含任何type_id,并且可以将消息作为Message对象接收,以后可以将其绑定到接收器中的自定义对象。

我收到如下消息。您能否让我知道如何使用Jackson2MessageConverter,以便消息将直接从Receiver端绑定到我的Object / HashMap。我也已经从发件人中删除了Type_ID。

消息在RabbitMQ中的外观

优先级:0 delivery_mode:2个标头:
ContentTypeId:java.lang.Object KeyTypeId:java.lang.Object content_encoding:UTF-8 content_type:application / json {“ Execution_start_time”:1473747183636,“ status”:“ SUCCESS”}

@Component
public class AdapterOutputHandler {

    private static Logger logger = Logger.getLogger(AdapterOutputHandler.class);

    @RabbitListener(containerFactory="adapterOPListenerContainerFactory",queues=Constants.ADAPTOR_OP_QUEUE)
    public void handleAdapterQueueMessage(HashMap<String,Object> message){

        System.out.println("Receiver:::::::::::"+message.toString());

    }

}
Run Code Online (Sandbox Code Playgroud)

连接

@Bean(name="adapterOPListenerContainerFactory")
    public SimpleRabbitListenerContainerFactory adapterOPListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
        DefaultClassMapper classMapper = new DefaultClassMapper();
        messageConverter.setClassMapper(classMapper);
        factory.setMessageConverter(messageConverter);

    }
Run Code Online (Sandbox Code Playgroud)

例外

Caused by: …
Run Code Online (Sandbox Code Playgroud)

rabbitmq spring-amqp spring-rabbitmq

8
推荐指数
1
解决办法
6340
查看次数

RabbitMQ如何将作业拆分为任务并处理结果

我在基于Spring的Web应用程序上有以下用例:

  • 我需要通过以下方式应用Competing Consumers EIP:队列中的消息实际上是属于同一作业的拆分任务.因此,我需要正确跟踪作业的所有任务何时完成及其完成状态,以便将场景保存为COMPLETED或FAILED,记录结果并通过例如电子邮件通知用户

所以,鉴于我上面描述的要求,我的问题是:

  1. 可以用RabbitMQ完成,如果是的话怎么办?

java spring rabbitmq spring-rabbitmq

8
推荐指数
1
解决办法
713
查看次数