如果从计划作业调用,Spring Boot 存储库不会保存到数据库

Fan*_*oos 2 java spring spring-mvc spring-batch spring-boot

我有一个 Spring Boot 应用程序,我需要在其中安排一项作业以从特定目录读取文件并将数据存储到数据库中。

由于文件数量非常大,我使用 Spring Batch 来处理文件部分。

该应用程序有一个名为 的组件,PraserStarer该组件有一个名为 的方法startParsing。这个方法是用@scheduledannotation来注释的。

@scheduled(fixedDelay = 60 * 1000)
public startParsing(){
    // start spring batch job
}
Run Code Online (Sandbox Code Playgroud)

我有一个存储库接口NewsRepositry注入到 Spring Batch 第一步的编写器中。

该应用程序有一个简单的控制器来手动调用该startParsing方法。从控制器调用 startParsing 方法时,一切正常。spring批处理作业正常启动,读取文件,将数据写入DB,并对文件进行归档。

当从调度框架调用该方法时startParsing,Spring Batch作业正常启动,并读取文件,但数据库中没有存储任何内容。

我怀疑这里的问题是有两种不同的上下文,一种用于调度部分,另一种用于应用程序的其余部分。

由于某种原因,调度上下文中没有事务管理器,这导致没有任何内容进入数据库。

1-我的怀疑正确吗?

2-如果是,我如何强制事务管理器加载到其他上下文?

编辑

解析器起始类的代码如下

@Component
public class ParserStarter {

    @Autowired
    JobLauncher jobLauncher;

    @Value("${app.data_directory}")
    private String dataDir;

    @Autowired
    private ParserJobListener jobListener;

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    public Resource[] getResources() throws IOException {
        // return array of file resource to be processed
    }

//  @Scheduled(fixedDelay = 60 * 1000)
    public void startParsing() throws Exception {
        String jobName = System.currentTimeMillis() + " New Parser Job";

        JobParameters jobParameters = new JobParametersBuilder().addString("source", jobName).toJobParameters();

        jobLauncher.run(getParsingJob(), jobParameters);
    }

    @Bean(name="getParsingJob")
    private Job getParsingJob() throws IOException {

        jobListener.setResources(getResources());

        Step processingStep = jobListener.processingStep();

        Step archivingStep = jobListener.archivingStep();

        Job job = jobBuilderFactory.get("Store News").incrementer(new RunIdIncrementer())
                .listener(jobListener).start(processingStep).next(archivingStep).build();

        return job;
    }
}
Run Code Online (Sandbox Code Playgroud)

作业侦听器的代码如下

@Component
public class ParserJobListener extends JobExecutionListenerSupport {

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    private Resource[] resources;

    @Value("${app.archive_directory}")
    private String archiveDirectory;

    @Autowired
    private Writer writer;

    public MultiResourceItemReader<DataRecord> multiResourceItemReader() {
        MultiResourceItemReader<DataRecord> resourceItemReader = new MultiResourceItemReader<DataRecord>();
        resourceItemReader.setResources(resources);
        resourceItemReader.setDelegate(new Reader());
        return resourceItemReader;
    }

    public Step archivingStep() {
        FileArchivingTask archivingTask = new FileArchivingTask(resources, archiveDirectory);
        return stepBuilderFactory.get("Archiving step").tasklet(archivingTask).build();
    }

    public Step processingStep() {
        return stepBuilderFactory.get("Process news file").<DataRecord, DataRecord>chunk(1000)
                .reader(multiResourceItemReader()).writer(writer).build();
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
            System.out.println("Job finished")
        }
    }

    public void setResources(Resource[] resources) {
        this.resources = resources;
    }

}
Run Code Online (Sandbox Code Playgroud)

剩下的就是作者了,如下

@Component
public class Writer implements ItemWriter<DataRecord>{

    @Autowired
    private DataRepository dataRepo;

    @Override
    public void write(List<? extends DataRecord> items) throws Exception {
        dataRepo.saveAll(items);
    }

}
Run Code Online (Sandbox Code Playgroud)

编辑2

我已经更改了作者的写入方法来单独保存和刷新每个项目,如下所示

@Transactional
    public void write(List<? extends GdeltRecord> items) throws Exception {
        for (GdeltRecord gdeltRecord : items) {
            dataRepo.saveAndFlush(gdeltRecord);
        }
//      dataRepo.saveAll(items);
 }
Run Code Online (Sandbox Code Playgroud)

这次应用程序抛出TransactionRequiredException: no transaction is in progress异常。

这是异常的完整堆栈跟踪

Caused by: javax.persistence.TransactionRequiredException: no transaction is in progress
    at org.hibernate.internal.SessionImpl.checkTransactionNeeded(SessionImpl.java:3552) ~[hibernate-core-5.3.7.Final.jar:5.3.7.Final]
    at org.hibernate.internal.SessionImpl.doFlush(SessionImpl.java:1444) ~[hibernate-core-5.3.7.Final.jar:5.3.7.Final]
    at org.hibernate.internal.SessionImpl.flush(SessionImpl.java:1440) ~[hibernate-core-5.3.7.Final.jar:5.3.7.Final]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_191]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_191]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_191]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_191]
    at org.springframework.orm.jpa.ExtendedEntityManagerCreator$ExtendedEntityManagerInvocationHandler.invoke(ExtendedEntityManagerCreator.java:350) ~[spring-orm-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at com.sun.proxy.$Proxy87.flush(Unknown Source) ~[na:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_191]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_191]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_191]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_191]
    at org.springframework.orm.jpa.SharedEntityManagerCreator$SharedEntityManagerInvocationHandler.invoke(SharedEntityManagerCreator.java:308) ~[spring-orm-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at com.sun.proxy.$Proxy87.flush(Unknown Source) ~[na:na]
    at org.springframework.data.jpa.repository.support.SimpleJpaRepository.flush(SimpleJpaRepository.java:533) ~[spring-data-jpa-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.data.jpa.repository.support.SimpleJpaRepository.saveAndFlush(SimpleJpaRepository.java:504) ~[spring-data-jpa-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_191]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_191]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_191]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_191]
    at org.springframework.data.repository.core.support.RepositoryComposition$RepositoryFragments.invoke(RepositoryComposition.java:359) ~[spring-data-commons-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.data.repository.core.support.RepositoryComposition.invoke(RepositoryComposition.java:200) ~[spring-data-commons-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.data.repository.core.support.RepositoryFactorySupport$ImplementationMethodExecutionInterceptor.invoke(RepositoryFactorySupport.java:644) ~[spring-data-commons-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.data.repository.core.support.RepositoryFactorySupport$QueryExecutorMethodInterceptor.doInvoke(RepositoryFactorySupport.java:608) ~[spring-data-commons-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.data.repository.core.support.RepositoryFactorySupport$QueryExecutorMethodInterceptor.lambda$invoke$3(RepositoryFactorySupport.java:595) ~[spring-data-commons-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.data.repository.core.support.RepositoryFactorySupport$QueryExecutorMethodInterceptor.invoke(RepositoryFactorySupport.java:595) ~[spring-data-commons-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.data.projection.DefaultMethodInvokingMethodInterceptor.invoke(DefaultMethodInvokingMethodInterceptor.java:59) ~[spring-data-commons-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:294) ~[spring-tx-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98) ~[spring-tx-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.dao.support.PersistenceExceptionTranslationInterceptor.invoke(PersistenceExceptionTranslationInterceptor.java:139) ~[spring-tx-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    ... 66 common frames omitted
Run Code Online (Sandbox Code Playgroud)

Fan*_*oos 5

我尝试了这个问题中描述的方法(JpaItemWriter:没有事务正在进行),它对我有用。

我定义了一个JpaTransactionManagerbean 并将其与步骤配置一起使用。

    @Bean
    @Primary
    public JpaTransactionManager jpaTransactionManager() {
        final JpaTransactionManager transactionManager = new JpaTransactionManager();
        transactionManager.setDataSource(dataSource);
        return transactionManager;
    }
Run Code Online (Sandbox Code Playgroud)

并在步骤配置中

    @Autowired
    JpaTransactionManager trxm;

    public Step processingStep(Resource[] resources) throws IOException {
        return stepBuilderFactory.get("Process CSV File")
                .transactionManager(trxm)
                .<DataRecord, DataRecord>chunk(1000)
                .reader(multiResourceItemReader()).writer(writer).build();
    }
Run Code Online (Sandbox Code Playgroud)