小编Gar*_*ell的帖子

如何使用非线程安全的MessageListener实现并发

这个问题的答案解释了<rabbit:listener-container/>当侦听器不是线程安全时如何在Spring AMQP中使用原型范围.

另一位用户(在评论中)询问如何仅使用Java配置来配置相同的环境.

rabbitmq spring-rabbit spring-amqp

4
推荐指数
1
解决办法
1181
查看次数

如何设置Spring Retry模板重试最大尝试次数:无限

我想用Spring Retry修改数据库连接的创建,以便在应用程序启动时数据库关闭时再试一次。我不想限制重试次数。我应该如何配置策略来做到这一点。

我当前的代码(我知道在这种状态下它限制为100):

SimpleRetryPolicy policy = new SimpleRetryPolicy(100, Collections.singletonMap(Exception.class, true));

// Use the policy...
RetryTemplate template = new RetryTemplate();
template.setRetryPolicy(policy);
Connection conn = template.execute(new RetryCallback<Connection, Exception>() {
    public Connection doWithRetry(RetryContext context) throws Exception {
        return getConnectionFactory().createConnection();
    }
});
Run Code Online (Sandbox Code Playgroud)

我应该如何修改此代码?

java spring multithreading jdbc spring-retry

4
推荐指数
1
解决办法
1886
查看次数

Kafka如何将生产者重试设置为无限

如何将 spring-boot 属性: spring.kafka. Producer.retries 设置为 Integer.MAX_VALUE ?

取消设置此属性是否有效,或者默认为 0 ?

@查看KIP中的默认kafka https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging

apache-kafka spring-boot spring-kafka

4
推荐指数
1
解决办法
9285
查看次数

如何在 spring-boot 应用程序启动期间创建许多 kafka 主题?

我有这个配置:

@Configuration
public class KafkaTopicConfig {

    private final TopicProperties topics;

    public KafkaTopicConfig(TopicProperties topics) {
        this.topics = topics;
    }

    @Bean
    public NewTopic newTopicImportCharge() {
        TopicProperties.Topic topic = topics.getTopicNameByType(MessageType.IMPORT_CHARGES.name());
        return new NewTopic(topic.getTopicName(), topic.getNumPartitions(), topic.getReplicationFactor());
    }

    @Bean
    public NewTopic newTopicImportPayment() {
        TopicProperties.Topic topic = topics.getTopicNameByType(MessageType.IMPORT_PAYMENTS.name());
        return new NewTopic(topic.getTopicName(), topic.getNumPartitions(), topic.getReplicationFactor());
    }

    @Bean
    public NewTopic newTopicImportCatalog() {
        TopicProperties.Topic topic = topics.getTopicNameByType(MessageType.IMPORT_CATALOGS.name());
        return new NewTopic(topic.getTopicName(), topic.getNumPartitions(), topic.getReplicationFactor());
    }
}
Run Code Online (Sandbox Code Playgroud)

我可以将 10 个不同的主题添加到 TopicProperties. 而且我不想手动创建每个类似的 bean。是否存在某种方法可以在spring-kafka或仅spring 中创建所有主题?

spring apache-kafka spring-boot spring-kafka spring-config

4
推荐指数
1
解决办法
6556
查看次数

Spring amqp:如何在MessageListenerAdapter中读取MessageProperties

如果我在其签名中添加MessageProperties,handleMessage方法不会从队列中获取消息.如果没有MessageProperties,它可以正常工作.

如何在MessageListenerAdapter的handleMessage中获取MessageProperties?

public class EventMessageAdapter {

  public void handleMessage(MessageProperties messageProperties, Event event)    {
  ...
  String id = messageProperties.getHeaders().get("key");
}
Run Code Online (Sandbox Code Playgroud)

rabbitmq spring-amqp spring-rabbitmq

3
推荐指数
1
解决办法
2794
查看次数

即使Kafka监听器(spring-kafka)没有初始化,如何启动spring应用程序

我正在使用一个使用spring-kafka的Kafka监听器的应用程序.我面临的问题是当Kafka监听器没有打开时弹簧上下文初始化失败(各种原因,例如Kafka服务器没有打开或关闭).如何确保我的应用程序是独立的.

谁能请帮忙.

spring apache-kafka spring-kafka

3
推荐指数
1
解决办法
1659
查看次数

Spring - 使用 TransactionSynchronizationManager 的测试方法

我在我的服务中使用TransactionSynchronizationManager.registerSynchronization(...),但无法为此方法编写测试,因为我收到异常:java.lang.IllegalStateException: Transaction synchronization is not active

我的服务的实施:

@Transactional
public void save(Order order) {
    log.info("Starting transaction...");
    orderDAO.save(order);
    TransactionSynchronizationManager.registerSynchronization(
            new TransactionSynchronizationAdapter() {
                @Override
                public void afterCommit() {
                    log.info("#afterCommit");
                    eventPublisher.publishEvent(new OrderPlacedEvent(order));
                }
            }
    );
    log.info("Commit");
}
Run Code Online (Sandbox Code Playgroud)

并测试哪个不起作用:

@RunWith(SpringRunner.class)
public class OrderServiceTest {

    private OrderDAO orderDAO;
    private ApplicationEventPublisher eventPublisher;
    private OrderService orderService;

    @Before
    public void setUp() throws Exception {
        orderDAO = mock(OrderDAO.class);
        eventPublisher = mock(ApplicationEventPublisher.class);
        orderService = new OrderService(
                orderDAO,
                eventPublisher
        );
    }

    @Test
    public void save() throws Exception { …
Run Code Online (Sandbox Code Playgroud)

java spring spring-transactions

3
推荐指数
1
解决办法
7285
查看次数

Spring Boot 中的 Rabbitmq 并发消费者

我正在使用 @RabbitListener 注释和 SimpleRabbitListenerContainerFactory bean 并行执行rabbitmq 消息并通过以下方式设置最小和最大并发消费者:

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setConcurrentConsumers(MIN_RABBIT_CONCURRENT_CONSUMERS);
    factory.setMaxConcurrentConsumers(MAX_RABBIT_CONCURRENT_CONSUMERS);
    factory.setConsecutiveActiveTrigger(1);
    factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    return factory;
}
Run Code Online (Sandbox Code Playgroud)

最小限制为 3,最大限制为 10。使用此配置,即使队列中有 12 条消息,也只有 3 条消息并行执行。

请告诉我配置有什么问题?

spring rabbitmq spring-amqp spring-boot

3
推荐指数
2
解决办法
1万
查看次数

春云流兔的退避设置

我仍在使用rabbitmq和spring云消息传递设置示例消息传递系统,但遇到了错误(?),或者我误解了文档。(使用spring-boot版本2.0.3.RELEASE)

为了这个例子,我想要以下设置

  spring:
    cloud:
      stream:
        rabbit:
          bindings:
            foo:
              consumer:
                auto-bind-dlq: true
        instanceCount: 2
        instanceIndex: 0
        bindings:
          foo:
            destination: foo
            group: fooGroup
            consumer:
              maxAttempts: 4
              backOffInitialInterval: 10000
              backOffMultiplier: 10.0 
          fooChannel:
            destination: foo
Run Code Online (Sandbox Code Playgroud)

这个问题有趣的部分是spring.cloud.stream.bindings.foo.consumer,我设置了 4 个 maxAttempts,初始退避间隔为 10 秒,乘数为 10。

应用 maxAttempts 和初始间隔,但不应用乘数。根据文档(此处此处),按键采用驼峰式设计,但backOffInitialInterval在应用时似乎也能正常工作back-off-initial-interval。我对按键的所有不同方式感到有点困惑,但那是另一个故事了。

我已经尝试了所有可能的写作方式backOffMultiplier,但它没有得到应用,消息每 10 秒发送一次。

现在,为了测试到底出了什么问题,我@Bean手动设置并配置了 RetryTemplate

@Bean
    RetryTemplate retryTemplate() {
        RetryTemplate r = new RetryTemplate();

        ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy();
        exponentialBackOffPolicy.setInitialInterval(10000);
        exponentialBackOffPolicy.setMultiplier(10.0);
        exponentialBackOffPolicy.setMaxInterval(100000);
        r.setBackOffPolicy(exponentialBackOffPolicy);

        SimpleRetryPolicy simpleRetryPolicy …
Run Code Online (Sandbox Code Playgroud)

java spring spring-rabbit spring-boot spring-messaging

3
推荐指数
1
解决办法
2663
查看次数

找不到类型为“org.springframework.mail.javamail.JavaMailSender”的bean

我正在使用spring boot 2.0.7 Releasespring-boot-starter-mail-2.0.7.Release.

javaMailsender在班级内部自动装配在 Windows 上工作正常,同时尝试部署解决Unixbelwo 问题


APPLICATION FAILED TO START
***************************

Description:

Field javaMailSender in com.fti.di.capstock.tran.pub.email.SendEmail required a bean of type 'org.springframework.mail.javamail.JavaMailSender' that could not be found.

The injection point has the following annotations:
        - @org.springframework.beans.factory.annotation.Autowired(required=true)


Action:

Consider defining a bean of type 'org.springframework.mail.javamail.JavaMailSender' in your configuration.
Run Code Online (Sandbox Code Playgroud)
    import org.springframework.mail.javamail.JavaMailSender;
    import org.springframework.mail.javamail.MimeMessageHelper;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.MessagingException;
    import org.springframework.stereotype.Component;

    import com.fti.di.capstock.tran.pub.constants.ApplicationFlowConstants;
    import com.fti.di.integration.constants.IntegrationConstants;
    import com.fti.di.integration.util.StringUtil;

    @Component("sendEmail")
    public class SendEmail {

        @Autowired
        private JavaMailSender …
Run Code Online (Sandbox Code Playgroud)

spring-boot

3
推荐指数
3
解决办法
6861
查看次数