使用spring批处理分区处理大量数据

spr*_*ast 4 spring spring-batch

我正在实现spring批处理作业,使用分区方法处理数据库表中的数百万条记录,如下所示 -

  1. 从分区器中的表中获取唯一的分区代码,并在执行上下文中设置相同的分区代码.

  2. 使用阅读器,处理器和编写器创建一个块步骤,以根据特定的分区代码处理记录.

这种方法是正确的还是有更好的方法来处理这种情况?由于某些分区代码可以具有比其他分区代码更多的记录,因此具有更多记录的那些可能比具有较少记录的分区代码花费更多时间来处理.

是否有可能创建分区/线程来处理如thread1进程1-1000,thread2进程1001-2000等?

如何控制创建的线程数,因为分区代码可以在100左右,我想在5次迭代中只创建20个线程和进程?

如果一个分区失败会发生什么,所有处理都会停止并恢复?

以下是配置 -

 <bean id="MyPartitioner" class="com.MyPartitioner" />
 <bean id="itemProcessor" class="com.MyProcessor" scope="step" />
 <bean id="itemReader" class="org.springframework.batch.item.database.JdbcCursorItemReader" scope="step" >
  <property name="dataSource" ref="dataSource"/>
  <property name="sql" value="select * from mytable WHERE code = '#{stepExecutionContext[code]}' "/>
  <property name="rowMapper">
      <bean class="com.MyRowMapper" scope="step"/>
  </property>
</bean>
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" >
    <property name="corePoolSize" value="20"/>
    <property name="maxPoolSize" value="20"/>
    <property name="allowCoreThreadTimeOut" value="true"/>
</bean>

<batch:step id="Step1" xmlns="http://www.springframework.org/schema/batch">
    <batch:tasklet transaction-manager="transactionManager">
        <batch:chunk reader="itemReader"  processor="itemProcessor" writer="itemWriter" commit-interval="200"/>
    </batch:tasklet>
</batch:step>
<batch:job id="myjob">
    <batch:step id="mystep">
        <batch:partition step="Step1" partitioner="MyPartitioner">
            <batch:handler grid-size="20" task-executor="taskExecutor"/>
        </batch:partition>
    </batch:step>
</batch:job>
Run Code Online (Sandbox Code Playgroud)

分区 -

public class MyPartitioner implements Partitioner{
@Override
public Map<String, ExecutionContext> partition(int gridSize)
{
Map<String, ExecutionContext> partitionMap = new HashMap<String, ExecutionContext>();
List<String> codes = getCodes();

for (String code : codes)
{
    ExecutionContext context = new ExecutionContext();
    context.put("code", code);
    partitionMap.put(code, context);
}
return partitionMap;}}
Run Code Online (Sandbox Code Playgroud)

谢谢

Nen*_*zic 6

我会说这是正确的方法,我不明白为什么你需要每1000个项目有一个线程,如果你对每个独特的分区代码进行分区并且有1000个项目的块,你将在每个线程1000个项目上进行交易,这是IMO好的.

  1. 除了保存唯一的分区代码之外,您还可以通过为每1000个相同的分区代码创建新的子上下文来计算每个分区代码和分区的数量(对于具有2200条记录的分区代码,您将调用3个线程) with context params:1 => partition_key = key1,skip = 0,count = 1000,2 => partition_key = key1,skip = 1000,count = 1000 and 3 => partition_key = key1,skip = 2000,count = 1000)if这就是你想要的,但我仍然会没有它

  2. 控制线程数,ThreadPoolTaskExecutor在创建线程时将传递给分区步骤.你有setCorePoolSize()可以在20上设置的方法,你将获得最多20个线程.下一个细粒度配置grid-size告诉我们将从完整分区映射创建多少个分区.这是网格大小的解释.分区就是分工.之后,您的线程配置将定义实际处理的并发性.

  3. 如果一个分区失败,整个分区步骤将失败,并显示分区失败的信息.成功分区已完成,并且不会再次调用,并且当作业重新启动时,它将通过重做失败和未处理的分区来获取它停止的位置.

希望我能找到你所有的问题,因为有很多问题.

案例1的例子 - 可能有错误,但只是想知道:

public class MyPartitioner implements Partitioner{
@Override
public Map<String, ExecutionContext> partition(int gridSize)
{
    Map<String, ExecutionContext> partitionMap = new HashMap<String, ExecutionContext>();
    Map<String, int> codesWithCounts = getCodesWithCounts();

    for (Entry<String, int> codeWithCount : codesWithCounts.entrySet())
    {
        for (int i = 0; i < codeWithCount.getValue(); i + 1000){
            ExecutionContext context = new ExecutionContext();
            context.put("code", code);
            context.put("skip", i);
            context.put("count", 1000);
            partitionMap.put(code, context);
        }
    }
    return partitionMap;
}
Run Code Online (Sandbox Code Playgroud)

与你的页数相比,你可以从上下文得到多少你应该跳过的内容,在2200的示例中将是:0,1000,2000.