bal*_*teo 6 java spring spring-batch spring-data-jpa spring-boot
我有一个弹簧批处理问题,该批处理ItemWriter
依赖JPA存储库来更新数据。
这里是:
@Component
public class MessagesDigestMailerItemWriter implements ItemWriter<UserAccount> {
private static final Logger log = LoggerFactory.getLogger(MessagesDigestMailerItemWriter.class);
@Autowired
private MessageRepository messageRepository;
@Autowired
private MailerService mailerService;
@Override
public void write(List<? extends UserAccount> userAccounts) throws Exception {
log.info("Mailing messages digests and updating messages notification statuses");
for (UserAccount userAccount : userAccounts) {
if (userAccount.isEmailNotification()) {
mailerService.mailMessagesDigest(userAccount);
}
for (Message message : userAccount.getReceivedMessages()) {
message.setNotificationSent(true);
messageRepository.save(message);//NOT SAVING!!
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
这是我的Step
配置:
@Configuration
public class MailStepConfiguration {
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private EntityManagerFactory entityManagerFactory;
@Autowired
private MessagesDigestMailerItemWriter itemWriter;
@Bean
public Step messagesDigestMailingStep() {
return stepBuilderFactory.get("messagesDigestMailingStep")//
.<UserAccount, UserAccount> chunk(1)//
.reader(jpaPagingItemReader(entityManagerFactory))//
.writer(itemWriter)//
.build();
}
@Bean(destroyMethod = "")
@StepScope
public static ItemReader<UserAccount> jpaPagingItemReader(EntityManagerFactory entityManagerFactory) {
final JpaPagingItemReader<UserAccount> reader = new JpaPagingItemReader<>();
reader.setEntityManagerFactory(entityManagerFactory);
reader.setQueryString("SELECT ua FROM UserAccount ua JOIN FETCH ua.receivedMessages msg WHERE msg.notificationSent = false AND msg.messageRead = false");
return reader;
}
}
Run Code Online (Sandbox Code Playgroud)
为了完整起见,这是我的spring boot配置:
@Configuration
@EnableBatchProcessing
@EnableAutoConfiguration
@ComponentScan("com.bignibou.batch.configuration")
public class Batch {
public static void main(String[] args) {
System.exit(SpringApplication.exit(new SpringApplicationBuilder(Batch.class).web(false).run(args)));
}
}
Run Code Online (Sandbox Code Playgroud)
和我的数据源配置:
@Configuration
@EnableJpaRepositories({ "com.bignibou.repository" })
@EntityScan("com.bignibou.domain")
public class DatasourceConfiguration {
@Bean
@ConfigurationProperties("spring.datasource.batch")
public DataSource batchDatasource() {
return DataSourceBuilder.create().build();
}
@Bean
@Primary
@ConfigurationProperties("spring.datasource.application")
public DataSource applicationDatasource() {
return DataSourceBuilder.create().build();
}
}
Run Code Online (Sandbox Code Playgroud)
我注意到执行流进入了ItemWriter的write方法,并且messageRepository.save(message);
确实得到执行,但是数据没有更新。
我怀疑这是交易问题,但不确定如何解决此问题...
编辑:我忘了提到我有两个 Postgres数据库:
我可以确认数据已写入作业存储库数据库中。问题出在应用程序数据上。考虑到我有两个PG数据库,我需要使用分布式事务。
我在这里为此提出了一个问题:
https://jira.spring.io/browse/BATCH-2642
原则上,对我们有帮助的是配置主事务管理器,如下所示:
@Configuration
public class JpaConfig {
private final DataSource dataSource;
@Autowired
public JpaConfig(@Qualifier("dataSource") DataSource dataSource) {
this.dataSource = dataSource;
}
@Bean
@Primary
public JpaTransactionManager jpaTransactionManager() {
final JpaTransactionManager transactionManager = new JpaTransactionManager();
transactionManager.setDataSource(dataSource);
return transactionManager;
}
}
Run Code Online (Sandbox Code Playgroud)
然后在配置步骤时使用事务管理器的自动装配实例,如下所示:
@Autowired
private PlatformTransactionManager transactionManager;
private TaskletStep buildTaskletStep() {
return stepBuilderFactory.get("SendCampaignStep")
.<UserAccount, UserAccount>chunk(pushServiceConfiguration.getCampaignBatchSize())
.reader(userAccountItemReader)
.processor(userAccountItemProcessor)
.writer(userAccountItemWriter)
.transactionManager(transactionManager)
.build();
}
}
Run Code Online (Sandbox Code Playgroud)
数据现在已正确保存,但仍有一些我没有完全理解的魔力......
你应该@EnableTransactionManagement
在你的主课上。我相信 Spring Boot 将为您创建事务管理器,但如果您想覆盖默认值,您可能需要显式配置它。
Spring Batch 提供了用于更改事务属性的 API。
归档时间: |
|
查看次数: |
5823 次 |
最近记录: |