我正在尝试使用Spring AMQP的RabbitTemplate实现RabbitMQ的Blocked Listener.在我的代码中我使用的是Spring-amqp 1.1.3版本的jar文件,而我也查看了1.3.1版本,这个版本也不支持.有没有人知道我是否缺少任何支持在RabbitMQ中向新连接注册阻塞侦听器的版本.或者,如果有任何未来版本的spring amqp支持此功能.
示例代码:
Connection connection = factory.newConnection();
connection.addBlockedListener(new BlockedListener() {
@Override
public void handleUnblocked() throws IOException {
System.out.println("Connection is Unblocked");
}
@Override
public void handleBlocked(String arg0) throws IOException {
System.out.println("Connection Blocked");
}
});
com.rabbitmq.client.Channel channel = connection.createChannel();
Run Code Online (Sandbox Code Playgroud) 嗨,我正在开发Spring-boot-RabbitMQ版本1.6。在开发应用程序时,我遇到的查询很少。阅读文档并浏览了其他堆栈溢出问题,但是我无法弄清楚几件事(可能是因为我的内存不好)。如果有人回答我的问题,那就太好了。
1)目前我有4个生产者和4个生产者。生产者可能产生数百万条消息或事件,因此对生产者和消费者使用单个连接将阻止消费者消费消息。生产者和消费者,这样它们就不会阻塞并且会改善性能。我对这种方法正确吗?
2)我正在使用CachingConnectionFactory来通过SimpleRabbitListenerContainerFactory创建连接,同时调用该工厂是否会为我们返回新的连接,因此如果我们使用CachingConnectionFactory,我们是否真的需要为Producer和Consumer编写一个单独的连接工厂。请在下面找到我的
1)配置类
@Configuration
@EnableRabbit
public class RabbitMqConfiguration{
@Autowired
private CachingConnectionFactory cachingConnectionFactory;
@Value("${concurrent.consumers}")
public int concurrent_consumers;
@Value("${max.concurrent.consumers}")
public int max_concurrent_consumers;
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cachingConnectionFactory);
factory.setConcurrentConsumers(concurrent_consumers);
factory.setMaxConcurrentConsumers(max_concurrent_consumers);
factory.setMessageConverter(jsonMessageConverter());
return factory;
}
@Bean
public MessageConverter jsonMessageConverter()
{
final Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
return converter;
}
}
Run Code Online (Sandbox Code Playgroud)
2)生产者阶层
@Configuration
public class TaskProducerConfiguration extends RabbitMqConfiguration {
@Value("${queue1}")
public String queue1;
@Value("${queue2}")
public String queue2;
@Value("${queue3}")
public String …Run Code Online (Sandbox Code Playgroud) 我的问题是如何在 SI 端点之间传递对象?
我发现的几乎每个示例都使用 XML 设置,我使用的是 Annotation 并且不知道如何解决此异常
Caused by: java.lang.IllegalArgumentException: Could not resolve 'json__TypeId__' in 'javaTypes'.
at org.springframework.integration.support.json.AbstractJacksonJsonObjectMapper.createJavaType(AbstractJacksonJsonObjectMapper.java:68)
at org.springframework.integration.support.json.Jackson2JsonObjectMapper.extractJavaType(Jackson2JsonObjectMapper.java:116)
at org.springframework.integration.support.json.Jackson2JsonObjectMapper.extractJavaType(Jackson2JsonObjectMapper.java:52)
at org.springframework.integration.support.json.AbstractJacksonJsonObjectMapper.fromJson(AbstractJacksonJsonObjectMapper.java:61)
at org.springframework.integration.json.JsonToObjectTransformer.doTransform(JsonToObjectTransformer.java:87)
at org.springframework.integration.transformer.AbstractTransformer.transform(AbstractTransformer.java:33)
... 18 more
Run Code Online (Sandbox Code Playgroud)
人们建议使用 xml 来解决这个问题,例如
<bean class="org.springframework.amqp.support.converter.DefaultClassMapper">
<property name="defaultType" value="foo.MyObject" />
</bean>
Run Code Online (Sandbox Code Playgroud)
但是我正在使用注释来创建转换器来处理从通道接收的消息,就像这样
@Bean
@Transformer(inputChannel="fromTcp", outputChannel="toHandler")
JsonToObjectTransformer jsonToObjectTransformer() {
ObjectMapper mapper = new ObjectMapper();
JsonObjectMapper<JsonNode, JsonParser> jm = new Jackson2JsonObjectMapper(mapper);
return new JsonToObjectTransformer(jm);
}
Run Code Online (Sandbox Code Playgroud)
实际上我不知道如何在 SI 端点之间传递对象。我只能通过 String 和 SI 使用默认序列化来处理 String 到 byte[],以及 byte[] 到 String。
我正在尝试对一个简单的流程进行单元测试,它检查文件是否存在,然后执行一些其他任务。
集成流程
@Bean
public IntegrationFlow initiateAlarmAck() {
return IntegrationFlows.from("processAckAlarmInputChannel")
.handle((payload, headers) -> {
LOG.info("Received initiate ack alarm request at " + payload);
File watermarkFile = getWatermarkFile();
if(watermarkFile.isFile()){
LOG.info("Watermark File exists");
return true;
}else{
LOG.info("File does not exists");
return false;
}
})
.<Boolean, String>route(p -> fileRouterFlow(p))
.get();
}
File getWatermarkFile(){
return new File(eventWatermarkFile);
}
@Router
public String fileRouterFlow(boolean fileExits){
if(fileExits)
return "fileFoundChannel";
else
return "fileNotFoundChannel";
}
Run Code Online (Sandbox Code Playgroud)
还有另一个集成流,它从中挑选消息fileNotFoundChannel并进行额外的处理。我不想对这部分进行单元测试。如何停止我的测试以不再做进一步的测试并在发布消息后停止fileNotFoundChannel?
@Bean
public IntegrationFlow fileNotFoundFlow() {
return IntegrationFlows.from("fileNotFoundChannel")
.handle((payload, headers) -> …Run Code Online (Sandbox Code Playgroud) 我有一个 JMS 生产者发送两种消息:业务逻辑和心跳消息。目前,两者都由同一个接收器处理,但我现在尝试通过使用选择器为每个接收器提供专用的类。我遇到的问题是,每当我将选择器添加到接收器时,它就会停止接收消息。这是我到目前为止所拥有的。为了简单起见,我只添加了心跳的代码:
要发送消息,我有这个:
private void sendHeartBeat() {
this.buildTemplate().send(new HeartbeatMessageCreator(this.someId));
}
private JmsTemplate buildTemplate() {
if (this.cachedJmsTemplate == null) {
final ActiveMQTopic activeMQTopic = new ActiveMQTopic(this.topic);
this.cachedJmsTemplate = new JmsTemplate(this.config.getCachedConnectionFactory());
this.cachedJmsTemplate.setDefaultDestination(activeMQTopic);
this.cachedJmsTemplate.setPubSubDomain(true);
}
return this.cachedJmsTemplate;
}
Run Code Online (Sandbox Code Playgroud)
心跳消息创建者:
class HeartbeatMessageCreator implements MessageCreator {
private final String someID;
HeartbeatMessageCreator(final String someID) {
this.someID = someID;
}
@Override
public Message createMessage(final Session session) throws JMSException {
final Serializable message = new ZHeartBeat(this.someID);
final Message jmsMessage = session.createObjectMessage(message);
jmsMessage.setJMSType(message.getClass().getName());
jmsMessage.setStringProperty("InternalMessageType", "HeartBeat"); // <-- Setting …Run Code Online (Sandbox Code Playgroud) 我正在尝试创建一个多线程侦听器,但所有消息都在同一个线程中执行.运行时,线程ID总是相同的,即使KafkaListerContainerFactory(正确地)是我实例化的那个.如果我几乎同时发送7条消息,我希望前三个同时处理,然后是后三个同时处理,然后是最后一个.我看到的是第一个完成的过程,然后是第二个,然后是第三个,等等.我误解了什么,或者只是错误配置?
这是我的倾听者:
@Component
public class ExampleKafkaController {
Log log = Log.getLog(ExampleKafkaController.class);
@Autowired
private KafkaListenerContainerFactory kafkaListenerContainerFactory;
@KafkaListener(topics = "${kafka.example.topic}")
public void listenForMessage(ConsumerRecord<?, ?> record) {
log.info("Got record:\n" + record.value());
System.out.println("Kafka Thread: " + Thread.currentThread());
System.out.println(kafkaListenerContainerFactory);
log.info("Waiting...");
waitSleep(10000);
log.info("Done!");
}
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Value("${kafka.example.topic}")
public String topic;
public void send(String payload) {
log.info("sending payload='" + payload + "' to topic='" + topic + "'");
kafkaTemplate.send(topic, payload);
}
private void waitSleep(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) { …Run Code Online (Sandbox Code Playgroud) 如何在发送消息后检索 JMS 消息标头但不使用该消息?
这是我的消息发送代码
jmsTemplate.convertAndSend(que, text, message -> {
LOGGER.info("setting JMS Message header values");
message.setStringProperty(RequestContext.HEADER_ID, id);
// LOGGER.info(message.getJMSMessageId()); -- this gives a null value here
return message;
});
Run Code Online (Sandbox Code Playgroud)
消息头仅在消息发布到队列后生成,因此在使用 MessagePostProcessor 时是否有一种简单的方法来检索 JMS 消息头?
这是我的配置:
@Bean
ActiveMQConnectionFactory activeMQConnectionFactory() {
String url = this.environment.getProperty("jms.broker.url");
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(url);
connectionFactory.setRedeliveryPolicy(redeliveryPolicy());
return connectionFactory;
}
@Bean
public RedeliveryPolicy redeliveryPolicy() {
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(500);
redeliveryPolicy.setBackOffMultiplier(2);
redeliveryPolicy.setUseExponentialBackOff(true);
redeliveryPolicy.setMaximumRedeliveries(5);
return redeliveryPolicy;
}
.....
Run Code Online (Sandbox Code Playgroud)
这是我的消费者:
@Service("msgConsumer")
public class MessageConsumer {
private static final String ORDER_RESPONSE_QUEUE = "thequeue.Q";
@JmsListener(destination = ORDER_RESPONSE_QUEUE, containerFactory = "jmsListenerContainerFactory")
public void receiveMessage(final Message<String> message) throws Exception {
MessageHeaders headers = message.getHeaders();
LOG.info("Application : headers received : {}", headers);
String response = message.getPayload();
LOG.info("Application …Run Code Online (Sandbox Code Playgroud) 我已经设置了 dlq 和 dlx,但失败的消息没有重定向到 dlq。我正在尝试从java应用程序以及从rabbitmq服务器发送消息到MESSAGES.EXCHANGE,在这两种情况下我都会收到消息,但在抛出异常消息后应该重定向到DLX.MESSAGES.EXCHANGE,但它正在发生。
下面是java代码和rabbitmq服务器的屏幕截图。一切对我来说看起来都不错。在代码或rabbitmq服务器中找不到任何问题。
队列设置代码-
public class DLQAmqpConfiguration {
public static final String DLX_MESSAGES_EXCHANGE = "DLX.MESSAGES.EXCHANGE";
public static final String DLQ_MESSAGES_QUEUE = "DLQ.MESSAGES.QUEUE";
public static final String MESSAGES_QUEUE = "MESSAGES.QUEUE";
public static final String MESSAGES_EXCHANGE = "MESSAGES.EXCHANGE";
public static final String ROUTING_KEY_MESSAGES_QUEUE = "ROUTING_KEY_MESSAGES_QUEUE";
@Bean
Queue messagesQueue() {
return QueueBuilder.durable(MESSAGES_QUEUE)
.withArgument("x-dead-letter-exchange", DLX_MESSAGES_EXCHANGE)
.build();
}
@Bean
DirectExchange messagesExchange() {
return new DirectExchange(MESSAGES_EXCHANGE);
}
@Bean
Binding bindingMessages() {
return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(ROUTING_KEY_MESSAGES_QUEUE);
}
@Bean
FanoutExchange deadLetterExchange() {
return new FanoutExchange(DLX_MESSAGES_EXCHANGE);
}
@Bean …Run Code Online (Sandbox Code Playgroud) 我已经将camel版本升级到最新,发现jar文件中缺少xsd。xsd 文件位于camel-spring-3.8.0.jar 中。由于这个原因,我在春季骆驼应用程序中遇到了异常。
这是一个错误吗?
spring-jms ×3
java ×2
jms ×2
rabbitmq ×2
spring-amqp ×2
spring-boot ×2
amqp ×1
annotations ×1
apache-camel ×1
apache-kafka ×1
dead-letter ×1
jmstemplate ×1
json ×1
spring ×1
spring-camel ×1
spring-kafka ×1