小编Gar*_*ell的帖子

Spring AMQP:将BlockedListener注册到Connection

我正在尝试使用Spring AMQP的RabbitTemplate实现RabbitMQ的Blocked Listener.在我的代码中我使用的是Spring-amqp 1.1.3版本的jar文件,而我也查看了1.3.1版本,这个版本也不支持.有没有人知道我是否缺少任何支持在RabbitMQ中向新连接注册阻塞侦听器的版本.或者,如果有任何未来版本的spring amqp支持此功能.

示例代码:

    Connection connection = factory.newConnection();
    connection.addBlockedListener(new BlockedListener() {
     @Override
     public void handleUnblocked() throws IOException {
        System.out.println("Connection is Unblocked");
     }

     @Override
     public void handleBlocked(String arg0) throws IOException {
        System.out.println("Connection Blocked");
     }           



    });
    com.rabbitmq.client.Channel channel = connection.createChannel();    
Run Code Online (Sandbox Code Playgroud)

amqp rabbitmq spring-amqp

2
推荐指数
1
解决办法
1054
查看次数

在Spring-Boot-RabbitMQ中处理连接

嗨,我正在开发Spring-boot-RabbitMQ版本1.6。在开发应用程序时,我遇到的查询很少。阅读文档并浏览了其他堆栈溢出问题,但是我无法弄清楚几件事(可能是因为我的内存不好)。如果有人回答我的问题,那就太好了。

1)目前我有4个生产者和4个生产者。生产者可能产生数百万条消息或事件,因此对生产者和消费者使用单个连接将阻止消费者消费消息。生产者和消费者,这样它们就不会阻塞并且会改善性能。我对这种方法正确吗?

2)我正在使用CachingConnectionFactory来通过SimpleRabbitListenerContainerFactory创建连接,同时调用该工厂是否会为我们返回新的连接,因此如果我们使用CachingConnectionFactory,我们是否真的需要为Producer和Consumer编写一个单独的连接工厂。请在下面找到我的

1)配置类

@Configuration
@EnableRabbit
public class RabbitMqConfiguration{

@Autowired
private CachingConnectionFactory cachingConnectionFactory;

@Value("${concurrent.consumers}")
public int concurrent_consumers;

@Value("${max.concurrent.consumers}")
public int max_concurrent_consumers;

 @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(cachingConnectionFactory);
        factory.setConcurrentConsumers(concurrent_consumers);
        factory.setMaxConcurrentConsumers(max_concurrent_consumers);
        factory.setMessageConverter(jsonMessageConverter());
        return factory;
    }

@Bean
public MessageConverter jsonMessageConverter()
{
    final Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
    return converter;
}

}
Run Code Online (Sandbox Code Playgroud)

2)生产者阶层

@Configuration
public class TaskProducerConfiguration extends RabbitMqConfiguration {

@Value("${queue1}")
public String queue1;

@Value("${queue2}")
public String queue2;

@Value("${queue3}")
public String …
Run Code Online (Sandbox Code Playgroud)

spring-rabbit spring-amqp spring-rabbitmq

2
推荐指数
1
解决办法
3962
查看次数

Spring集成,如何使用@Transformer从/到JSON转换?

我的问题是如何在 SI 端点之间传递对象?

我发现的几乎每个示例都使用 XML 设置,我使用的是 Annotation 并且不知道如何解决此异常

Caused by: java.lang.IllegalArgumentException: Could not resolve 'json__TypeId__' in 'javaTypes'.
    at org.springframework.integration.support.json.AbstractJacksonJsonObjectMapper.createJavaType(AbstractJacksonJsonObjectMapper.java:68)
    at org.springframework.integration.support.json.Jackson2JsonObjectMapper.extractJavaType(Jackson2JsonObjectMapper.java:116)
    at org.springframework.integration.support.json.Jackson2JsonObjectMapper.extractJavaType(Jackson2JsonObjectMapper.java:52)
    at org.springframework.integration.support.json.AbstractJacksonJsonObjectMapper.fromJson(AbstractJacksonJsonObjectMapper.java:61)
    at org.springframework.integration.json.JsonToObjectTransformer.doTransform(JsonToObjectTransformer.java:87)
    at org.springframework.integration.transformer.AbstractTransformer.transform(AbstractTransformer.java:33)
    ... 18 more
Run Code Online (Sandbox Code Playgroud)

人们建议使用 xml 来解决这个问题,例如

<bean class="org.springframework.amqp.support.converter.DefaultClassMapper">
    <property name="defaultType" value="foo.MyObject" />
</bean>
Run Code Online (Sandbox Code Playgroud)

但是我正在使用注释来创建转换器来处理从通道接收的消息,就像这样

    @Bean
    @Transformer(inputChannel="fromTcp", outputChannel="toHandler")
    JsonToObjectTransformer jsonToObjectTransformer() {
        ObjectMapper mapper = new ObjectMapper();       
        JsonObjectMapper<JsonNode, JsonParser> jm = new Jackson2JsonObjectMapper(mapper);
        return new JsonToObjectTransformer(jm);
    }
Run Code Online (Sandbox Code Playgroud)

实际上我不知道如何在 SI 端点之间传递对象。我只能通过 String 和 SI 使用默认序列化来处理 String 到 byte[],以及 byte[] 到 String。

json annotations spring-integration

2
推荐指数
1
解决办法
7042
查看次数

单元测试 Spring 集成流 DSL

我正在尝试对一个简单的流程进行单元测试,它检查文件是否存在,然后执行一些其他任务。

集成流程

@Bean
public IntegrationFlow initiateAlarmAck() {
    return IntegrationFlows.from("processAckAlarmInputChannel")
            .handle((payload, headers) ->  {
                LOG.info("Received initiate ack alarm request at " + payload);
                File watermarkFile = getWatermarkFile();
                if(watermarkFile.isFile()){
                    LOG.info("Watermark File exists");
                    return true;
                }else{
                    LOG.info("File does not exists");
                    return false;
                }
            })
            .<Boolean, String>route(p -> fileRouterFlow(p))
            .get();
}
File getWatermarkFile(){
    return new File(eventWatermarkFile);
}

@Router
public String fileRouterFlow(boolean fileExits){
    if(fileExits)
        return "fileFoundChannel";
    else
        return "fileNotFoundChannel";
}
Run Code Online (Sandbox Code Playgroud)

还有另一个集成流,它从中挑选消息fileNotFoundChannel并进行额外的处理。我不想对这部分进行单元测试。如何停止我的测试以不再做进一步的测试并在发布消息后停止fileNotFoundChannel

@Bean
public IntegrationFlow fileNotFoundFlow() {
    return IntegrationFlows.from("fileNotFoundChannel")
            .handle((payload, headers) -> …
Run Code Online (Sandbox Code Playgroud)

spring-integration spring-boot-test

2
推荐指数
1
解决办法
6268
查看次数

JMSListener 选择器不工作

我有一个 JMS 生产者发送两种消息:业务逻辑和心跳消息。目前,两者都由同一个接收器处理,但我现在尝试通过使用选择器为每个接收器提供专用的类。我遇到的问题是,每当我将选择器添加到接收器时,它就会停止接收消息。这是我到目前为止所拥有的。为了简单起见,我只添加了心跳的代码:

要发送消息,我有这个:

private void sendHeartBeat() {
    this.buildTemplate().send(new HeartbeatMessageCreator(this.someId));
}

private JmsTemplate buildTemplate() {
    if (this.cachedJmsTemplate == null) {
        final ActiveMQTopic activeMQTopic = new ActiveMQTopic(this.topic);
        this.cachedJmsTemplate = new JmsTemplate(this.config.getCachedConnectionFactory());
        this.cachedJmsTemplate.setDefaultDestination(activeMQTopic);
        this.cachedJmsTemplate.setPubSubDomain(true);
    }
    return this.cachedJmsTemplate;
}
Run Code Online (Sandbox Code Playgroud)

心跳消息创建者:

class HeartbeatMessageCreator implements MessageCreator {
private final String someID;

HeartbeatMessageCreator(final String someID) {
    this.someID = someID;
}

@Override
public Message createMessage(final Session session) throws JMSException {
    final Serializable message = new ZHeartBeat(this.someID);
    final Message jmsMessage = session.createObjectMessage(message);
    jmsMessage.setJMSType(message.getClass().getName());
    jmsMessage.setStringProperty("InternalMessageType", "HeartBeat"); // <-- Setting …
Run Code Online (Sandbox Code Playgroud)

java jms spring-jms

2
推荐指数
1
解决办法
1万
查看次数

ConcurrentMessageListenerContainer不是并发的

我正在尝试创建一个多线程侦听器,但所有消息都在同一个线程中执行.运行时,线程ID总是相同的,即使KafkaListerContainerFactory(正确地)是我实例化的那个.如果我几乎同时发送7条消息,我希望前三个同时处理,然后是后三个同时处理,然后是最后一个.我看到的是第一个完成的过程,然后是第二个,然后是第三个,等等.我误解了什么,或者只是错误配置?

这是我的倾听者:

@Component
public class ExampleKafkaController {
    Log log = Log.getLog(ExampleKafkaController.class);

    @Autowired
    private KafkaListenerContainerFactory kafkaListenerContainerFactory;

    @KafkaListener(topics = "${kafka.example.topic}")
    public void listenForMessage(ConsumerRecord<?, ?> record) {
        log.info("Got record:\n" + record.value());
        System.out.println("Kafka Thread: " + Thread.currentThread());
        System.out.println(kafkaListenerContainerFactory);

        log.info("Waiting...");
        waitSleep(10000);

        log.info("Done!");
    }

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value("${kafka.example.topic}")
    public String topic;    

    public void send(String payload) {
        log.info("sending payload='" + payload + "' to topic='" + topic + "'");
        kafkaTemplate.send(topic, payload);
    }

    private void waitSleep(long ms) {
        try {
            Thread.sleep(ms);
        } catch (InterruptedException e) { …
Run Code Online (Sandbox Code Playgroud)

apache-kafka spring-boot spring-kafka

2
推荐指数
1
解决办法
1850
查看次数

消息发送后立即检索 JMS 标头而不消耗消息

如何在发送消息后检索 JMS 消息标头但不使用该消息?

这是我的消息发送代码

jmsTemplate.convertAndSend(que, text, message -> {

       LOGGER.info("setting JMS Message header values");    
       message.setStringProperty(RequestContext.HEADER_ID, id);
     //  LOGGER.info(message.getJMSMessageId()); -- this gives a null value here
       return message;
 });
Run Code Online (Sandbox Code Playgroud)

消息头仅在消息发布到队列后生成,因此在使用 MessagePostProcessor 时是否有一种简单的方法来检索 JMS 消息头?

我已经引用了链接 -这里这里,但没有太多帮助:(

spring activemq-classic jmstemplate spring-jms

2
推荐指数
1
解决办法
3175
查看次数

Spring JMS 和 ActiveMQ 在哪里查看死信队列中的消息

这是我的配置:

@Bean
ActiveMQConnectionFactory activeMQConnectionFactory() {
    String url = this.environment.getProperty("jms.broker.url");
    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
    connectionFactory.setBrokerURL(url);
    connectionFactory.setRedeliveryPolicy(redeliveryPolicy());
    return connectionFactory;
}

@Bean
public RedeliveryPolicy redeliveryPolicy() {
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
    redeliveryPolicy.setInitialRedeliveryDelay(500);
    redeliveryPolicy.setBackOffMultiplier(2);
    redeliveryPolicy.setUseExponentialBackOff(true);
    redeliveryPolicy.setMaximumRedeliveries(5);
    return redeliveryPolicy;
}
.....
Run Code Online (Sandbox Code Playgroud)

这是我的消费者:

@Service("msgConsumer")
public class MessageConsumer {

    private static final String ORDER_RESPONSE_QUEUE = "thequeue.Q";

    @JmsListener(destination = ORDER_RESPONSE_QUEUE, containerFactory = "jmsListenerContainerFactory")
    public void receiveMessage(final Message<String> message) throws Exception {

        MessageHeaders headers =  message.getHeaders();
        LOG.info("Application : headers received : {}", headers);

        String response = message.getPayload();
        LOG.info("Application …
Run Code Online (Sandbox Code Playgroud)

activemq-classic jms message-queue dead-letter spring-jms

2
推荐指数
1
解决办法
3410
查看次数

SpringBoot + Rabbitmq - DLQ 队列不工作

我已经设置了 dlq 和 dlx,但失败的消息没有重定向到 dlq。我正在尝试从java应用程序以及从rabbitmq服务器发送消息到MESSAGES.EXCHANGE,在这两种情况下我都会收到消息,但在抛出异常消息后应该重定向到DLX.MESSAGES.EXCHANGE,但它正在发生。

下面是java代码和rabbitmq服务器的屏幕截图。一切对我来说看起来都不错。在代码或rabbitmq服务器中找不到任何问题。

队列设置代码-

public class DLQAmqpConfiguration {
    public static final String DLX_MESSAGES_EXCHANGE = "DLX.MESSAGES.EXCHANGE";
    public static final String DLQ_MESSAGES_QUEUE = "DLQ.MESSAGES.QUEUE";

    public static final String MESSAGES_QUEUE = "MESSAGES.QUEUE";
    public static final String MESSAGES_EXCHANGE = "MESSAGES.EXCHANGE";
    public static final String ROUTING_KEY_MESSAGES_QUEUE = "ROUTING_KEY_MESSAGES_QUEUE";

    @Bean
    Queue messagesQueue() {
        return QueueBuilder.durable(MESSAGES_QUEUE)
                .withArgument("x-dead-letter-exchange", DLX_MESSAGES_EXCHANGE)
                .build();
    }

    @Bean
    DirectExchange messagesExchange() {
        return new DirectExchange(MESSAGES_EXCHANGE);
    }

    @Bean
    Binding bindingMessages() {
        return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(ROUTING_KEY_MESSAGES_QUEUE);
    }

    @Bean
    FanoutExchange deadLetterExchange() {
        return new FanoutExchange(DLX_MESSAGES_EXCHANGE);
    }

    @Bean …
Run Code Online (Sandbox Code Playgroud)

rabbitmq spring-rabbit spring-boot

2
推荐指数
1
解决办法
3746
查看次数

Camel-spring-3.10.0.jar 中缺少camel-spring.xsd

我已经将camel版本升级到最新,发现jar文件中缺少xsd。xsd 文件位于camel-spring-3.8.0.jar 中。由于这个原因,我在春季骆驼应用程序中遇到了异常。

这是一个错误吗?

java apache-camel spring-camel

2
推荐指数
1
解决办法
758
查看次数