小编Gur*_*uru的帖子

Spring RabbitMQ - 在具有@RabbitListener配置的服务上使用手动通道确认

如何在不使用自动确认的情况下手动确认消息.是否有与一起使用的方式@RabbitListener@EnableRabbit风格的配置.大多数文档告诉我们SimpleMessageListenerContainer一起使用ChannelAwareMessageListener.但是使用它会失去注释提供的灵活性.我已经配置了我的服务如下:

@Service
public class EventReceiver {

@Autowired
private MessageSender messageSender;

@RabbitListener(queues = "${eventqueue}")
public void receiveMessage(Order order) throws Exception {

  // code for processing order
}
Run Code Online (Sandbox Code Playgroud)

我的RabbitConfiguration如下

@EnableRabbit
public class RabbitApplication implements RabbitListenerConfigurer {

public static void main(String[] args) {
    SpringApplication.run(RabbitApplication.class, args);
}

@Bean


public MappingJackson2MessageConverter jackson2Converter() {
        MappingJackson2MessageConverter converter = new  MappingJackson2MessageConverter();
        return converter;
    @Bean
public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
      SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
      factory.setConnectionFactory(rabbitConnectionFactory());
      factory.setMaxConcurrentConsumers(5);
      factory.setMessageConverter((MessageConverter) jackson2Converter());
      factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
      return factory; …
Run Code Online (Sandbox Code Playgroud)

java rabbitmq spring-rabbit spring-amqp spring-rabbitmq

14
推荐指数
2
解决办法
2万
查看次数

当所有代理都关闭时 Spring-Kafka Producer 重试

我正在使用 Springboot 2.3.5.RELEASE 和 spring-kafka 2.6.3。我正在尝试做一个简单的 Kafka Producer Retry POC,这应该会导致生产者在代理关闭时或者在消息发送到代理之前抛出异常时重试。
以下生产者配置适用于启用重试的幂等生产者。

// Producer configuration
@Bean
public Map<String, Object> producerConfigs() {
   Map<String, Object> props = new HashMap<>();
   props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
   props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
   props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
   props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "3");
   props.put(ProducerConfig.RETRIES_CONFIG, "10");
   props.put(ProducerConfig.ACKS_CONFIG, "all");
}
@Bean
public ProducerFactory<String, Object> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}
... other configs
Run Code Online (Sandbox Code Playgroud)

我知道只有在代理向生产者发送 ack 时出现暂时性错误时,Kafka 生产者重试才会起作用。因此,如果代理在发送消息之前就已关闭,则上述重试不起作用。因此我尝试用 spring-kafka 引入@Retry,不幸的是我也无法让它工作。有谁知道如何解决这个问题,因为这似乎是一个常见的用例,比如如果经纪人宕机或者出现网络故障,我不希望我的制作人停止工作

@Transaction
@Retryable(value = Exception.class, maxAttemptsExpression = "10",
        backoff = @Backoff(delayExpression = "1000")) …
Run Code Online (Sandbox Code Playgroud)

spring-boot spring-kafka

2
推荐指数
1
解决办法
5468
查看次数

Kafka Streams:使用 Spring Cloud Stream 为每组主题定义多个 Kafka Streams

我正在尝试使用 Kafka Streams 做一个简单的 POC。但是,我在启动应用程序时遇到异常。我正在使用 Spring-Kafka、Kafka-Streams 2.5.1 和 Spring boot 2.3.5 Kafka 流配置

@Configuration
public class KafkaStreamsConfig {
    private static final Logger log = LoggerFactory.getLogger(KafkaStreamsConfig.class);

    @Bean
    public Function<KStream<String, String>, KStream<String, String>> processAAA() {
        return input -> input.peek((key, value) -> log
                .info("AAA Cloud Stream Kafka Stream processing : {}", input.toString().length()));
    }

    @Bean
    public Function<KStream<String, String>, KStream<String, String>> processBBB() {
        return input -> input.peek((key, value) -> log
                .info("BBB Cloud Stream Kafka Stream processing : {}", input.toString().length()));
    }

    @Bean
    public Function<KStream<String, String>, KStream<String, …
Run Code Online (Sandbox Code Playgroud)

apache-kafka spring-cloud-stream apache-kafka-streams spring-kafka spring-cloud-stream-binder-kafka

2
推荐指数
1
解决办法
4425
查看次数