使用JPA存储库的Spring Batch ItemWriter持续存在问题

bal*_*teo 6 java spring spring-batch spring-data-jpa spring-boot

我有一个弹簧批处理问题,该批处理ItemWriter依赖JPA存储库来更新数据。

这里是:

@Component
public class MessagesDigestMailerItemWriter implements ItemWriter<UserAccount> {

    private static final Logger log = LoggerFactory.getLogger(MessagesDigestMailerItemWriter.class);

    @Autowired
    private MessageRepository messageRepository;

    @Autowired
    private MailerService mailerService;

    @Override
    public void write(List<? extends UserAccount> userAccounts) throws Exception {
        log.info("Mailing messages digests and updating messages notification statuses");

        for (UserAccount userAccount : userAccounts) {
            if (userAccount.isEmailNotification()) {
                mailerService.mailMessagesDigest(userAccount);
            }
            for (Message message : userAccount.getReceivedMessages()) {
                message.setNotificationSent(true);
                messageRepository.save(message);//NOT SAVING!!
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

这是我的Step配置:

@Configuration
public class MailStepConfiguration {

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private EntityManagerFactory entityManagerFactory;

    @Autowired
    private MessagesDigestMailerItemWriter itemWriter;

    @Bean
    public Step messagesDigestMailingStep() {
        return stepBuilderFactory.get("messagesDigestMailingStep")//
                .<UserAccount, UserAccount> chunk(1)//
                .reader(jpaPagingItemReader(entityManagerFactory))//
                .writer(itemWriter)//
                .build();
    }

    @Bean(destroyMethod = "")
    @StepScope
    public static ItemReader<UserAccount> jpaPagingItemReader(EntityManagerFactory entityManagerFactory) {
        final JpaPagingItemReader<UserAccount> reader = new JpaPagingItemReader<>();
        reader.setEntityManagerFactory(entityManagerFactory);
        reader.setQueryString("SELECT ua FROM UserAccount ua JOIN FETCH ua.receivedMessages msg WHERE msg.notificationSent = false AND msg.messageRead = false");
        return reader;
    }

}
Run Code Online (Sandbox Code Playgroud)

为了完整起见,这是我的spring boot配置:

@Configuration
@EnableBatchProcessing
@EnableAutoConfiguration
@ComponentScan("com.bignibou.batch.configuration")
public class Batch {
    public static void main(String[] args) {
        System.exit(SpringApplication.exit(new SpringApplicationBuilder(Batch.class).web(false).run(args)));
    }
}
Run Code Online (Sandbox Code Playgroud)

和我的数据源配置:

@Configuration
@EnableJpaRepositories({ "com.bignibou.repository" })
@EntityScan("com.bignibou.domain")
public class DatasourceConfiguration {

    @Bean
    @ConfigurationProperties("spring.datasource.batch")
    public DataSource batchDatasource() {
        return DataSourceBuilder.create().build();
    }

    @Bean
    @Primary
    @ConfigurationProperties("spring.datasource.application")
    public DataSource applicationDatasource() {
        return DataSourceBuilder.create().build();
    }
}
Run Code Online (Sandbox Code Playgroud)

我注意到执行流进入了ItemWriter的write方法,并且messageRepository.save(message);确实得到执行,但是数据没有更新。

我怀疑这是交易问题,但不确定如何解决此问题...

编辑:我忘了提到我有两个 Postgres数据库:

  1. 一个用于作业存储库数据
  2. 另一个用于应用程序数据。

我可以确认数据已写入作业存储库数据库中。问题出在应用程序数据上。考虑到我有两个PG数据库,我需要使用分布式事务。

Pet*_*řák 7

我在这里为此提出了一个问题:

https://jira.spring.io/browse/BATCH-2642

原则上,对我们有帮助的是配置​​主事务管理器,如下所示:

@Configuration
public class JpaConfig {

    private final DataSource dataSource;

    @Autowired
    public JpaConfig(@Qualifier("dataSource") DataSource dataSource) {
        this.dataSource = dataSource;
    }

    @Bean
    @Primary
    public JpaTransactionManager jpaTransactionManager() {
        final JpaTransactionManager transactionManager = new JpaTransactionManager();
        transactionManager.setDataSource(dataSource);
        return transactionManager;
    }

}
Run Code Online (Sandbox Code Playgroud)

然后在配置步骤时使用事务管理器的自动装配实例,如下所示:

@Autowired
private PlatformTransactionManager transactionManager;

private TaskletStep buildTaskletStep() {
        return stepBuilderFactory.get("SendCampaignStep")
                    .<UserAccount, UserAccount>chunk(pushServiceConfiguration.getCampaignBatchSize())
                    .reader(userAccountItemReader)
                    .processor(userAccountItemProcessor)
                    .writer(userAccountItemWriter)
                    .transactionManager(transactionManager)
                    .build();
    }
}
Run Code Online (Sandbox Code Playgroud)

数据现在已正确保存,但仍有一些我没有完全理解的魔力......


lub*_*nac 1

你应该@EnableTransactionManagement在你的主课上。我相信 Spring Boot 将为您创建事务管理器,但如果您想覆盖默认值,您可能需要显式配置它

Spring Batch 提供了用于更改事务属性的 API

  • 非常感谢您的回答。我尝试了“@EnableTransactionManagement”但无济于事。我相信它是与“@Transaction”一起使用的,不建议与 Spring Batch 一起使用。 (2认同)