这个问题的答案解释了<rabbit:listener-container/>当侦听器不是线程安全时如何在Spring AMQP中使用原型范围.
另一位用户(在评论中)询问如何仅使用Java配置来配置相同的环境.
我想用Spring Retry修改数据库连接的创建,以便在应用程序启动时数据库关闭时再试一次。我不想限制重试次数。我应该如何配置策略来做到这一点。
我当前的代码(我知道在这种状态下它限制为100):
SimpleRetryPolicy policy = new SimpleRetryPolicy(100, Collections.singletonMap(Exception.class, true));
// Use the policy...
RetryTemplate template = new RetryTemplate();
template.setRetryPolicy(policy);
Connection conn = template.execute(new RetryCallback<Connection, Exception>() {
public Connection doWithRetry(RetryContext context) throws Exception {
return getConnectionFactory().createConnection();
}
});
Run Code Online (Sandbox Code Playgroud)
我应该如何修改此代码?
如何将 spring-boot 属性: spring.kafka. Producer.retries 设置为 Integer.MAX_VALUE ?
取消设置此属性是否有效,或者默认为 0 ?
@查看KIP中的默认kafka https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
我有这个配置:
@Configuration
public class KafkaTopicConfig {
private final TopicProperties topics;
public KafkaTopicConfig(TopicProperties topics) {
this.topics = topics;
}
@Bean
public NewTopic newTopicImportCharge() {
TopicProperties.Topic topic = topics.getTopicNameByType(MessageType.IMPORT_CHARGES.name());
return new NewTopic(topic.getTopicName(), topic.getNumPartitions(), topic.getReplicationFactor());
}
@Bean
public NewTopic newTopicImportPayment() {
TopicProperties.Topic topic = topics.getTopicNameByType(MessageType.IMPORT_PAYMENTS.name());
return new NewTopic(topic.getTopicName(), topic.getNumPartitions(), topic.getReplicationFactor());
}
@Bean
public NewTopic newTopicImportCatalog() {
TopicProperties.Topic topic = topics.getTopicNameByType(MessageType.IMPORT_CATALOGS.name());
return new NewTopic(topic.getTopicName(), topic.getNumPartitions(), topic.getReplicationFactor());
}
}
Run Code Online (Sandbox Code Playgroud)
我可以将 10 个不同的主题添加到 TopicProperties. 而且我不想手动创建每个类似的 bean。是否存在某种方法可以在spring-kafka或仅spring 中创建所有主题?
如果我在其签名中添加MessageProperties,handleMessage方法不会从队列中获取消息.如果没有MessageProperties,它可以正常工作.
如何在MessageListenerAdapter的handleMessage中获取MessageProperties?
public class EventMessageAdapter {
public void handleMessage(MessageProperties messageProperties, Event event) {
...
String id = messageProperties.getHeaders().get("key");
}
Run Code Online (Sandbox Code Playgroud) 我正在使用一个使用spring-kafka的Kafka监听器的应用程序.我面临的问题是当Kafka监听器没有打开时弹簧上下文初始化失败(各种原因,例如Kafka服务器没有打开或关闭).如何确保我的应用程序是独立的.
谁能请帮忙.
我在我的服务中使用TransactionSynchronizationManager.registerSynchronization(...),但无法为此方法编写测试,因为我收到异常:java.lang.IllegalStateException: Transaction synchronization is not active。
我的服务的实施:
@Transactional
public void save(Order order) {
log.info("Starting transaction...");
orderDAO.save(order);
TransactionSynchronizationManager.registerSynchronization(
new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
log.info("#afterCommit");
eventPublisher.publishEvent(new OrderPlacedEvent(order));
}
}
);
log.info("Commit");
}
Run Code Online (Sandbox Code Playgroud)
并测试哪个不起作用:
@RunWith(SpringRunner.class)
public class OrderServiceTest {
private OrderDAO orderDAO;
private ApplicationEventPublisher eventPublisher;
private OrderService orderService;
@Before
public void setUp() throws Exception {
orderDAO = mock(OrderDAO.class);
eventPublisher = mock(ApplicationEventPublisher.class);
orderService = new OrderService(
orderDAO,
eventPublisher
);
}
@Test
public void save() throws Exception { …Run Code Online (Sandbox Code Playgroud) 我正在使用 @RabbitListener 注释和 SimpleRabbitListenerContainerFactory bean 并行执行rabbitmq 消息并通过以下方式设置最小和最大并发消费者:
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrentConsumers(MIN_RABBIT_CONCURRENT_CONSUMERS);
factory.setMaxConcurrentConsumers(MAX_RABBIT_CONCURRENT_CONSUMERS);
factory.setConsecutiveActiveTrigger(1);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
Run Code Online (Sandbox Code Playgroud)
最小限制为 3,最大限制为 10。使用此配置,即使队列中有 12 条消息,也只有 3 条消息并行执行。
请告诉我配置有什么问题?
我仍在使用rabbitmq和spring云消息传递设置示例消息传递系统,但遇到了错误(?),或者我误解了文档。(使用spring-boot版本2.0.3.RELEASE)
为了这个例子,我想要以下设置
spring:
cloud:
stream:
rabbit:
bindings:
foo:
consumer:
auto-bind-dlq: true
instanceCount: 2
instanceIndex: 0
bindings:
foo:
destination: foo
group: fooGroup
consumer:
maxAttempts: 4
backOffInitialInterval: 10000
backOffMultiplier: 10.0
fooChannel:
destination: foo
Run Code Online (Sandbox Code Playgroud)
这个问题有趣的部分是spring.cloud.stream.bindings.foo.consumer,我设置了 4 个 maxAttempts,初始退避间隔为 10 秒,乘数为 10。
应用 maxAttempts 和初始间隔,但不应用乘数。根据文档(此处和此处),按键采用驼峰式设计,但backOffInitialInterval在应用时似乎也能正常工作back-off-initial-interval。我对按键的所有不同方式感到有点困惑,但那是另一个故事了。
我已经尝试了所有可能的写作方式backOffMultiplier,但它没有得到应用,消息每 10 秒发送一次。
现在,为了测试到底出了什么问题,我@Bean手动设置并配置了 RetryTemplate
@Bean
RetryTemplate retryTemplate() {
RetryTemplate r = new RetryTemplate();
ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy();
exponentialBackOffPolicy.setInitialInterval(10000);
exponentialBackOffPolicy.setMultiplier(10.0);
exponentialBackOffPolicy.setMaxInterval(100000);
r.setBackOffPolicy(exponentialBackOffPolicy);
SimpleRetryPolicy simpleRetryPolicy …Run Code Online (Sandbox Code Playgroud) 我正在使用spring boot 2.0.7 Release和spring-boot-starter-mail-2.0.7.Release.
我javaMailsender在班级内部自动装配在 Windows 上工作正常,同时尝试部署解决Unixbelwo 问题
APPLICATION FAILED TO START
***************************
Description:
Field javaMailSender in com.fti.di.capstock.tran.pub.email.SendEmail required a bean of type 'org.springframework.mail.javamail.JavaMailSender' that could not be found.
The injection point has the following annotations:
- @org.springframework.beans.factory.annotation.Autowired(required=true)
Action:
Consider defining a bean of type 'org.springframework.mail.javamail.JavaMailSender' in your configuration.
Run Code Online (Sandbox Code Playgroud)
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.mail.javamail.MimeMessageHelper;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;
import com.fti.di.capstock.tran.pub.constants.ApplicationFlowConstants;
import com.fti.di.integration.constants.IntegrationConstants;
import com.fti.di.integration.util.StringUtil;
@Component("sendEmail")
public class SendEmail {
@Autowired
private JavaMailSender …Run Code Online (Sandbox Code Playgroud) spring ×6
spring-boot ×5
apache-kafka ×3
java ×3
rabbitmq ×3
spring-amqp ×3
spring-kafka ×3
jdbc ×1
spring-retry ×1