使用Spring批处理文件项阅读器进行多线程处理

slo*_*ues 4 spring multithreading spring-batch

在Spring Batch中,我试图读取一个CSV文件,并希望将每一行分配给一个单独的线程并进行处理.我试图通过使用TaskExecutor来实现它,但是所有线程正在发生的事情是一次选择同一行.我也尝试使用Partioner实现这个概念,也有同样的事情发生.请参阅下面的我的配置Xml.

步骤说明

    <step id="Step2">
        <tasklet task-executor="taskExecutor">
            <chunk reader="reader" processor="processor" writer="writer" commit-interval="1" skip-limit="1">
            </chunk>
        </tasklet> 
    </step>

              <bean id="reader" class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="file:cvs/user.csv" />

<property name="lineMapper">
    <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
      <!-- split it -->
      <property name="lineTokenizer">
            <bean
          class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
            <property name="names" value="userid,customerId,ssoId,flag1,flag2" />
        </bean>
      </property>
      <property name="fieldSetMapper">   

          <!-- map to an object -->
          <bean
            class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
            <property name="prototypeBeanName" value="user" />
          </bean>           
      </property>

      </bean>
  </property>

       </bean>

      <bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor">
 <property name="concurrencyLimit" value="4"/>   
Run Code Online (Sandbox Code Playgroud)

我尝试过不同类型的任务执行器,但它们都以相同的方式运行.如何将每行分配给单独的线程?

dim*_*zak 6

FlatFileItemReader不是线程安全的.在您的示例中,您可以尝试将CS​​V文件拆分为较小的CSV文件,然后使用MultiResourcePartitioner处理其中的每个文件.这可以分两步完成,一个用于拆分原始文件(如10个较小的文件),另一个用于处理拆分文件.这样您就不会遇到任何问题,因为每个文件都由一个线程处理.

例:

<batch:job id="csvsplitandprocess">
     <batch:step id="step1" next="step2master">
    <batch:tasklet>
        <batch:chunk reader="largecsvreader" writer="csvwriter" commit-interval="500">
        </batch:chunk>
    </batch:tasklet>
    </batch:step>
    <batch:step id="step2master">
    <partition step="step2" partitioner="partitioner">
        <handler grid-size="10" task-executor="taskExecutor"/>
    </partition>
</batch:step>
</batch:job>

<batch:step id="step2">
    <batch:tasklet>
        <batch:chunk reader="smallcsvreader" writer="writer" commit-interval="100">
        </batch:chunk>
    </batch:tasklet>
</batch:step>


<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
            <property name="corePoolSize" value="10" />
            <property name="maxPoolSize" value="10" />
    </bean>

<bean id="partitioner" 
class="org.springframework.batch.core.partition.support.MultiResourcePartitioner">
<property name="resources" value="file:cvs/extracted/*.csv" />
</bean>
Run Code Online (Sandbox Code Playgroud)

替代分区的替代方案可能是自定义线程安全的读者,他将为每一行创建一个线程,但可能分区是您的最佳选择