Spring Batch:具有多线程执行程序的Tasklet具有与Throttling算法相关的非常糟糕的性能

pmp*_*mpm 8 java performance spring multithreading spring-batch

使用Spring批处理2.2.1,我已经配置了Spring Batch Job,我使用了这种方法:

配置如下:

  • Tasklet使用ThreadPoolTask​​Executor限制为15个线程

  • throttle-limit等于线程数

  • 块用于:

    • 1个JdbcCursorItemReader的同步适配器,允许许多线程按照Spring Batch文档推荐使用它

      您可以将调用同步到read(),只要处理和写入是块中最昂贵的部分,您的步骤可能仍然比单线程配置快得多.

    • JdbcCursorItemReader上的saveState为false

    • 基于JPA的Custom ItemWriter.请注意,它对一个项目的处理可能在处理时间方面有所不同,可能需要几毫秒到几秒(> 60秒).

    • commit-interval设置为1(我知道它可能会更好,但不是问题)

  • 关于Spring Batch doc Recommendmandation,所有jdbc池都没问题

由于以下原因,运行批处理会导致非常奇怪和糟糕的结果:

  • 在某些步骤中,如果项目需要一些时间来处理,那么线程池中的几乎所有线程最终都不会执行任何操作而只会处理,只有慢速编写器正在工作.

看看Spring Batch代码,根本原因似乎在这个包中:

  • 组织/ springframework的/批号/复读/支持/

这种工作方式是一种功能还是限制/错误?

如果它是一个功能,配置的方式是什么方式使所有线程不被长时间的处理工作挨饿而不必重写所有内容?

请注意,如果所有项目都占用相同的时间,一切正常,多线程就可以了,但如果其中一项处理需要花费更多时间,那么多线程在慢速进程工作时几乎无用.

注意我打开了这个问题:

UBI*_*ACK 5

正如亚历克斯所说,似乎这种行为是根据javadocs的合同:

子类只需要提供一个获取下一个结果*的方法,以及一个等待从并发*进程或线程返回所有结果的方法

看着:

TaskExecutorRepeatTemplate#waitForResults

另一个选择是使用分区:

  • 一个TaskExecutorPartitionHandler,它将从Partitionned ItemReader执行项目,见下文
  • Partitioner实现,它给出了ItemReader处理的范围,请参阅下面的ColumnRangePartitioner
  • 将使用分区程序将填充的数据读取数据的CustomReader,请参阅下面的myItemReader配置

Michael Minella在他的Pro Spring Batch一书的第11章中解释了这一点:

<batch:job id="batchWithPartition">
    <batch:step id="step1.master">
        <batch:partition  partitioner="myPartitioner" handler="partitionHandler"/>
    </batch:step>       
</batch:job>
<!-- This one will create Paritions of Number of lines/ Grid Size--> 
<bean id="myPartitioner" class="....ColumnRangePartitioner"/>
<!-- This one will handle every partition in a Thread -->
<bean id="partitionHandler" class="org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler">
    <property name="taskExecutor" ref="multiThreadedTaskExecutor"/>
    <property name="step" ref="step1" />
    <property name="gridSize" value="10" />
</bean>
<batch:step id="step1">
        <batch:tasklet transaction-manager="transactionManager">
            <batch:chunk reader="myItemReader"
                writer="manipulatableWriterForTests" commit-interval="1"
                skip-limit="30000">
                <batch:skippable-exception-classes>
                    <batch:include class="java.lang.Exception" />
                </batch:skippable-exception-classes>
            </batch:chunk>
        </batch:tasklet>
</batch:step>
 <!-- scope step is critical here-->
<bean id="myItemReader"    
                        class="org.springframework.batch.item.database.JdbcCursorItemReader" scope="step">
    <property name="dataSource" ref="dataSource"/>
    <property name="sql">
        <value>
            <![CDATA[
                select * from customers where id >= ? and id <=  ?
            ]]>
        </value>
    </property>
    <property name="preparedStatementSetter">
        <bean class="org.springframework.batch.core.resource.ListPreparedStatementSetter">
            <property name="parameters">
                <list>
 <!-- minValue and maxValue are filled in by Partitioner for each Partition in an ExecutionContext-->
                    <value>{stepExecutionContext[minValue]}</value>
                    <value>#{stepExecutionContext[maxValue]}</value>
                </list>
            </property>
        </bean>
    </property>
    <property name="rowMapper" ref="customerRowMapper"/>
</bean>
Run Code Online (Sandbox Code Playgroud)

Partitioner.java:

 package ...;
  import java.util.HashMap;  
 import java.util.Map;
 import org.springframework.batch.core.partition.support.Partitioner;
 import org.springframework.batch.item.ExecutionContext;
 public class ColumnRangePartitioner  implements Partitioner {
 private String column;
 private String table;
 public Map<String, ExecutionContext> partition(int gridSize) {
    int min =  queryForInt("SELECT MIN(" + column + ") from " + table);
    int max = queryForInt("SELECT MAX(" + column + ") from " + table);
    int targetSize = (max - min) / gridSize;
    System.out.println("Our partition size will be " + targetSize);
    System.out.println("We will have " + gridSize + " partitions");
    Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();
    int number = 0;
    int start = min;
    int end = start + targetSize - 1;
    while (start <= max) {
        ExecutionContext value = new ExecutionContext();
        result.put("partition" + number, value);
        if (end >= max) {
            end = max;
        }
        value.putInt("minValue", start);
        value.putInt("maxValue", end);
        System.out.println("minValue = " + start);
        System.out.println("maxValue = " + end);
        start += targetSize;
        end += targetSize;
        number++;
    }
    System.out.println("We are returning " + result.size() + " partitions");
    return result;
}
public void setColumn(String column) {
    this.column = column;
}
public void setTable(String table) {
    this.table = table;
}
}
Run Code Online (Sandbox Code Playgroud)