不能停止弹簧批量步骤..
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:batch="http://www.springframework.org/schema/batch"
xsi:schemaLocation="
http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">
<description>Example job to get you started. It provides a skeleton for a typical batch application.</description>
<bean id="HelloTasklet" class="c.c.c.HelloTasklet" scope="step"/>
<bean id="completionPolicy" class="org.springframework.batch.repeat.policy.DefaultResultCompletionPolicy"/>
<bean id="chunkTimeout" class="org.springframework.batch.repeat.policy.TimeoutTerminationPolicy">
<constructor-arg value="3"/>
</bean>
<bean id="commitCount" class="org.springframework.batch.repeat.policy.SimpleCompletionPolicy">
<property name="chunkSize" value="200" />
</bean>
<bean id="chunkCompletionPolicy" class="org.springframework.batch.repeat.policy.CompositeCompletionPolicy">
<property name="policies">
<list>
<ref bean="chunkTimeout" />
<ref bean="commitCount" />
</list>
</property>
</bean>
<bean id="RandomChunkSizePolicy" class="c.c.c.RandomChunkSizePolicy"/>
<job id="importInvoices" xmlns="http://www.springframework.org/schema/batch">
<listeners>
<listener ref="loggingListener"/>
</listeners>
<step id="vehicleStep" next="hello">
<tasklet>
<chunk reader="vehicleReader" …Run Code Online (Sandbox Code Playgroud) 我具有以下春季批处理作业配置。只有一个读者,然后将详细信息传递给具有两个特定作者的复合作家。两位作者共享一个公共父对象,并且需要为他们执行的INSERT操作使用相同的JobId。
<bean id="job" parent="simpleJob">
<property name="steps">
<list>
<bean parent="simpleStep">
<property name="itemReader" ref="policyReader"/>
<property name="itemWriter" ref="stagingCompositeWriter"/>
</bean>
</list>
</property>
</bean>
<bean id="stagingCompositeWriter" class="org.springframework.batch.item.support.CompositeItemWriter">
<property name="delegates">
<list>
<ref bean="stagingLoadWriter"/>
<ref bean="stagingPolicyWriter"/>
</list>
</property>
</bean>
<bean id="abstractStagingWriter" class="a.b.c.AbstractStagingWriter" abstract="true">
<property name="stepExecution" value="#{stepExecutionContext}"/>
<property name="hedgingStagingDataSource" ref="hedgingStagingDataSource"/>
</bean>
<bean id="stagingLoadWriter" class="a.b.c.StagingLoadWriter" parent="abstractStagingWriter"/>
<bean id="stagingPolicyWriter" class="a.b.c.StagingPolicyWriter" parent="abstractStagingWriter"/>
Run Code Online (Sandbox Code Playgroud)
当我运行代码时,出现以下错误
Caused by: org.springframework.expression.spel.SpelEvaluationException: EL1008E:(pos 0): Field or property 'stepExecutionContext' cannot be found on object of type 'org.springframework.beans.factory.config.BeanExpressionContext'
at org.springframework.expression.spel.ast.PropertyOrFieldReference.readProperty(PropertyOrFieldReference.java:208)
at org.springframework.expression.spel.ast.PropertyOrFieldReference.getValueInternal(PropertyOrFieldReference.java:72)
at org.springframework.expression.spel.ast.SpelNodeImpl.getValue(SpelNodeImpl.java:93)
at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:88)
at org.springframework.context.expression.StandardBeanExpressionResolver.evaluate(StandardBeanExpressionResolver.java:139)
Run Code Online (Sandbox Code Playgroud)
我尝试在不同的地方设置scope …
我正在尝试学习Spring Batch,本书提供了示例接口类,但是命名约定让我失望.
public interface ItemProcessor<I,O> {
O process(I item) throws Exception;
Run Code Online (Sandbox Code Playgroud)
}
我以前从未见过类名中的参数.它们是出于实际目的还是只是作为指导?如果类名被更改,以下代码会有何不同?
public interface ItemProcessor {
O process(I item) throws Exception;
Run Code Online (Sandbox Code Playgroud)
}
覆盖特定的自动配置时,我对spring-boot的行为有些困惑。
我想部分覆盖BatchAutoConfiguration,但是我想我的问题并不特定于BatchAutoConfiguration。实际上,我只想“覆盖”此类的两个方法:
public BatchDatabaseInitializer batchDatabaseInitializer()和public ExitCodeGenerator jobExecutionExitCodeGenerator()。
因此,我编写了以下代码:package ch.test.autoconfig.autoconfigure;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.boot.ExitCodeGenerator;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.batch.BatchAutoConfiguration;
import org.springframework.boot.autoconfigure.batch.BatchDatabaseInitializer;
import org.springframework.boot.autoconfigure.batch.BatchProperties;
import org.springframework.boot.autoconfigure.batch.JobExecutionExitCodeGenerator;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcOperations;
import javax.sql.DataSource;
/**
* I'm using the same annotations as defined in BatchAutoConfiguration...
*/
@Configuration
@ConditionalOnClass({ JobLauncher.class, DataSource.class, JdbcOperations.class })
@AutoConfigureAfter(HibernateJpaAutoConfiguration.class)
@ConditionalOnBean(JobLauncher.class)
@EnableConfigurationProperties(BatchProperties.class)
// ... but I add @AutoConfigureBefore(BatchAutoConfiguration.class) to take precedence over BatchAutoConfiguration
@AutoConfigureBefore(BatchAutoConfiguration.class)
public …Run Code Online (Sandbox Code Playgroud) 我扩展了官方Spring-Batch站点 - 批处理服务提供的批处理服务代码, 并修改了ItemWriter以生成CSV并写入数据库.
我使用CompositeItemWriter写入CSV和数据库.但是,JdbcBatchItemWriter与CompositeItemWriter无法正常工作.代码如下所示.
@Bean
public ItemWriter<Person> writer(DataSource dataSource) {
CompositeItemWriter<Person> cWriter = new CompositeItemWriter<Person>();
// For DataBase
JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<Person>();
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>());
writer.setSql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)");
writer.setDataSource(dataSource);
// For CSV
FlatFileItemWriter<Person> csvWriter = new FlatFileItemWriter<Person>();
csvWriter.setResource(new FileSystemResource(new File("./csv/new-data.csv")));
csvWriter.setShouldDeleteIfExists(true);
DelimitedLineAggregator<Person> lineAggregator = new DelimitedLineAggregator<Person>();
lineAggregator.setDelimiter(",");
BeanWrapperFieldExtractor<Person> fieldExtractor = new BeanWrapperFieldExtractor<Person>();
String[] names = {"firstName", "lastName"};
fieldExtractor.setNames(names);
lineAggregator.setFieldExtractor(fieldExtractor);
csvWriter.setLineAggregator(lineAggregator);
List<ItemWriter<? super Person>> mWriter = new ArrayList<ItemWriter<? super Person>>();
mWriter.add(writer); // **Comment this …Run Code Online (Sandbox Code Playgroud) 我很好奇一个人如何设法将读者的所有可用数据向下传递到管道中。
例如,我希望读取器提取所有数据并将整个结果集传递给处理器和写入器。结果集很小,我不担心资源。我以为我已经通过使所有组件(读取器,写入器,处理器)接收并返回已处理项目的集合来正确实现了此目的。
虽然该过程的结果看起来不错,但我看到的是该作业正在读取所有内容,将其向下传递到管道中,然后返回给读取器,读取所有内容并将其向下传递,依此类推。
我已经考虑过创建一个额外的步骤来读取所有数据并将其传递给后续步骤,但是我很好奇我是否可以做到这一点以及如何做到
这份工作看起来像
@Bean
Job job() throws Exception {
return jobs.get("job").start(step1()).build()
}
@Bean
protected Step step1() throws Exception {
return steps.get("step1").chunk(10)
.reader(reader()
.processor(processor()
.writer(writer()).build()
Run Code Online (Sandbox Code Playgroud)
// ....
读者,处理器和作家接受并返回一个列表,例如
class DomainItemProcessor implements ItemProcessor<List<Domain>, List<Domain>>{
Run Code Online (Sandbox Code Playgroud) 在批处理作业步骤配置中,我计划在writer中执行2个查询,第一个查询是更新表A中的记录,然后第二个查询是再次在表A中插入新记录。
到目前为止,我认为CompositeItemWriter可以实现上述目标,即,我需要创建2个JdbcBatchItemWriters,一个用于更新,另一个用于插入。
我的第一个问题是CompositeItemWriter是否适合上述要求?
如果是,那就引出第二个有关交易的问题。例如,如果第一次更新成功,而第二次插入失败。第一次更新交易会自动回滚吗?否则,如何在同一事务中手动提取两个更新?
提前致谢!
如果在 Spring Batch 中触发作业执行,我有几个 Spring Batch 作业可以正常工作。这些作业将使用 JpaItemWriter 读取和写入数据库,其中需要事务。
我的问题是,我有配置为定期运行这些作业的石英调度程序,我收到“javax.persistence.TransactionRequiredException: no transaction is in progress”错误。我知道目前石英正在实例化作业 bean 而不是 spring 本身,这使得 bean 不知道 spring 管理的正在进行的事务,如果我错了,请纠正我。
但是,我尝试了很多方法,但都没有奏效。以下是我目前的配置:
石英属性
org.quartz.scheduler.instanceName=sample_instance
org.quartz.scheduler.instanceId=AUTO
org.quartz.threadPool.threadCount=5
org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreCMT
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.MSSQLDelegate
org.quartz.jobStore.useProperties=true
org.quartz.jobStore.misfireThreshold=60000
org.quartz.jobStore.tablePrefix=QRTZ_
org.quartz.jobStore.isClustered=true
org.quartz.jobStore.clusterCheckinInterval=20000
Run Code Online (Sandbox Code Playgroud)
调度程序配置文件
@EnableScheduling
@Configuration
public class SchedulerConfig {
@Inject
private DataSource dataSource;
@Inject
private JobsListenerService jobsListenerService;
@Bean
public JobFactory jobFactory(ApplicationContext applicationContext) {
AutowiringSpringBeanJobFactory jobFactory = new AutowiringSpringBeanJobFactory();
jobFactory.setApplicationContext(applicationContext);
return jobFactory;
}
@Bean
public SchedulerFactoryBean schedulerFactoryBean(JobFactory jobFactory)
throws IOException {
SchedulerFactoryBean factory = new SchedulerFactoryBean();
factory.setJobFactory(jobFactory);
factory.setDataSource(dataSource); …Run Code Online (Sandbox Code Playgroud) 我正在spring-batch应用程序的ItemProcessor中遍历一个列表,但出现此错误:
java.util.ConcurrentModificationException: null
Run Code Online (Sandbox Code Playgroud)
当我将批处理配置设置为使用单个线程时,它可以正常工作。我正在使用SimpleAsyncTaskExecutor。客户是一个实体,并且具有客户别名列表。
private void fillCustomer(final Customer customer, final ExtractLine result){
CustomerAlias customerAlias = customer.getCustomerAlias().stream().filter(s -> s.getAliasType().isAster()).findFirst().orElse(nullAlias);
result.setCustomer(customerAlias.getCalNmeAliasName());
}
java.util.ConcurrentModificationException: null
at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901)
at java.util.ArrayList$Itr.next(ArrayList.java:851)
at org.hibernate.collection.internal.AbstractPersistentCollection$IteratorProxy.next(AbstractPersistentCollection.java:815)
at java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1812)
at java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126)
at java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:498)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:485)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:152)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:464)
at com.liq.sub.impl.MyProcessor.fillCustomer(MyProcessor.java:107)
Run Code Online (Sandbox Code Playgroud) 我正在按照以下链接编写端到端的春季批处理作业测试。
https://docs.spring.io/spring-batch/docs/current/reference/html/testing.html#endToEndTesting
这告诉使用 @RunWith(SpringRunner.class) 注释您的测试类,这是 Junit4 功能,我的测试用例使用 junit5 的老式引擎工作正常。
由于我的其他非批处理相关测试用例在 jupiter 引擎上运行,因此我想在我的端到端 spring 批处理作业测试中使用相同的测试用例。如果我用@ExtendWith(SpringExtension.class) 替换@RunWith(SpringRunner.class),我可以看到没有自动装配的字段被实例化并且具有空值。
由于我是春季批次和木星的新手,我无法理解出了什么问题。
任何指针将不胜感激
junit 测试用例示例代码
import com.zaxxer.hikari.HikariDataSource;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.runner.RunWith;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.configuration.annotation.BatchConfigurer;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.test.JobLauncherTestUtils;
import org.springframework.batch.test.JobRepositoryTestUtils;
import org.springframework.batch.test.context.SpringBatchTest;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.util.TestPropertyValues;
import org.springframework.context.ApplicationContextInitializer;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Import;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.test.context.junit4.SpringRunner;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import static org.assertj.core.api.Assertions.assertThat;
@Slf4j
@Testcontainers
@SpringBatchTest
@SpringBootTest
@RunWith(SpringRunner.class)
//@ExtendWith(SpringExtension.class) …Run Code Online (Sandbox Code Playgroud)