在Spring Batch中,可以将多个JdbcBatchItemWriters配置为并行写入吗?

Wol*_*ton 5 spring-batch

在我的春季批处理作业中,我的项目处理器将项目读取器读取的对象拆分为七个可变长度列表.必须将这些列表写入数据库中的七个表,并且任何错误(例如因任何原因拒绝记录的数据库)必须导致事务在所有七个表上回滚.

目前,我使用这七个列表创建一个包装对象,这些列表将传递给自定义项目编写器.编写器获取所有这些项目,创建自己的七个列表,以便它只有七个批量写入(使用基于JdbcTemplate的DAO)来处理项目处理器返回的一批包装对象.

我的编写器顺序调用每个表的插入函数,我想加快.我想知道我是否可以将列表并行编写到各自的表中,以便总执行时间是写入时间最长的时间.我不能妥协的一个要求是,这必须在单个事务中,如果任何编写者有任何异常,则需要回滚.

inc*_*.de 6

这是一个使用TaskExecutor并在其上扩展的简单解决方案org.springframework.batch.item.support.CompositeItemWriter.

package de.incompleteco.spring.batch.item.support;

import java.util.List;

import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.CompositeItemWriter;
import org.springframework.core.task.TaskExecutor;
import org.springframework.util.Assert;

import de.incompleteco.spring.domain.SimpleEntity;

public class ParallelCompositeItemWriter extends CompositeItemWriter<SimpleEntity> {

    private List<ItemWriter<? super SimpleEntity>> delegates;

    private TaskExecutor taskExecutor;

    @Override
    public void write(final List<? extends SimpleEntity> item) throws Exception {
        for (final ItemWriter<? super SimpleEntity> writer : delegates) {
            taskExecutor.execute(new Runnable()  {
                @Override
                public void run() {
                    try {
                        writer.write(item);
                    } catch (Throwable t) {
                        rethrow(t);
                    }   
                }

                private void rethrow(Throwable t) {
                    if (t instanceof RuntimeException) {
                        throw (RuntimeException) t;
                    }
                    else if (t instanceof Error) {
                        throw (Error) t;
                    }
                    throw new IllegalStateException(t);
                }       
            });
        }//end for
    }


    public void setTaskExecutor(TaskExecutor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }

    @Override
    public void setDelegates(List<ItemWriter<? super SimpleEntity>> delegates) {
        this.delegates = delegates;
        super.setDelegates(delegates);
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        super.afterPropertiesSet();
        Assert.notNull(taskExecutor,"Task executor needs to be set");
    }



}
Run Code Online (Sandbox Code Playgroud)

示例配置看起来像这样;

<batch:job id="simpleJob">
    <batch:step id="simpleJob.step1">
        <batch:tasklet>
            <batch:chunk reader="reader" writer="writer" commit-interval="10"/>
        </batch:tasklet>
    </batch:step>
</batch:job>

<bean id="reader" class="org.springframework.batch.item.support.IteratorItemReader">
    <constructor-arg ref="itemList"/>
</bean>

<bean id="writer" class="de.incompleteco.spring.batch.item.support.ParallelCompositeItemWriter">
    <property name="delegates" ref="writerDelegates"/>
    <property name="taskExecutor" ref="writerTaskExecutor"/>
</bean>

<util:list id="writerDelegates">
    <bean class="org.springframework.batch.item.database.JdbcBatchItemWriter">
        <property name="dataSource" ref="dataSource1"/>
        <property name="sql" value="insert into test_table (idcol,stuff) values (:idCol,:stuff)"/>
        <property name="itemSqlParameterSourceProvider">
            <bean class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider"/>
        </property>
    </bean>
    <bean class="org.springframework.batch.item.database.JdbcBatchItemWriter">
        <property name="dataSource" ref="dataSource2"/>
        <property name="sql" value="insert into test_table (idcol,stuff) values (:idCol,:stuff)"/>
        <property name="itemSqlParameterSourceProvider">
            <bean class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider"/>
        </property>
    </bean>     
</util:list>

<util:list id="itemList">
    <bean class="de.incompleteco.spring.domain.SimpleEntity">
        <constructor-arg value="stuff1"/>
    </bean>
    <bean class="de.incompleteco.spring.domain.SimpleEntity">
        <constructor-arg value="stuff2"/>
    </bean>     
    <bean class="de.incompleteco.spring.domain.SimpleEntity">
        <constructor-arg value="stuff3"/>
    </bean>     
</util:list>

<task:executor id="writerTaskExecutor" pool-size="3"/>


<bean id="dataSource1" class="bitronix.tm.resource.jdbc.PoolingDataSource" init-method="init" destroy-method="close">
    <property name="className" value="org.h2.jdbcx.JdbcDataSource" />
    <property name="uniqueName" value="#{T(System).currentTimeMillis()}" />
    <property name="allowLocalTransactions" value="true"/>
    <property name="maxPoolSize" value="2" />
    <property name="driverProperties">
        <props>
            <prop key="URL">jdbc:h2:mem:a;DB_CLOSE_DELAY=-1</prop>
        </props>
    </property>
</bean> 

<bean id="dataSource2" class="bitronix.tm.resource.jdbc.PoolingDataSource" init-method="init" destroy-method="close">
    <property name="className" value="org.h2.jdbcx.JdbcDataSource" />
    <property name="uniqueName" value="#{T(System).currentTimeMillis()}" />
    <property name="allowLocalTransactions" value="true"/>
    <property name="maxPoolSize" value="2" />
    <property name="driverProperties">
        <props>
            <prop key="URL">jdbc:h2:mem:b;DB_CLOSE_DELAY=-1</prop>
        </props>
    </property>
</bean>     

<jdbc:initialize-database  data-source="dataSource1">
    <jdbc:script location="classpath:/META-INF/sql/schema-h2.sql"/>
</jdbc:initialize-database>

<jdbc:initialize-database  data-source="dataSource2">
    <jdbc:script location="classpath:/META-INF/sql/schema-h2.sql"/>
</jdbc:initialize-database>
<!-- XA transaction -->

<bean id="btmConfig" factory-method="getConfiguration" class="bitronix.tm.TransactionManagerServices"/>

<bean id="BitronixTransactionManager" factory-method="getTransactionManager"
    class="bitronix.tm.TransactionManagerServices" depends-on="btmConfig" destroy-method="shutdown" />

<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
    <property name="transactionManager" ref="BitronixTransactionManager" />
    <property name="userTransaction" ref="BitronixTransactionManager" />
</bean>
Run Code Online (Sandbox Code Playgroud)

这个例子使用以下内容;

  • Bitronix JTA支持跨多个数据库的事务
  • 将简单实体的一个非常简单的模型转换为简单的jdbc记录

(数据库中的东西很粗糙,只是一个例子)