Spring无法将事务传播到ForkJoin的RecursiveAction

nyx*_*yxz 8 java spring multithreading database-management

我正在尝试实现一个多线程解决方案,以便我可以并行化我的业务逻辑,包括读取和写入数据库.

技术堆栈:Spring 4.0.2,Hibernate 4.3.8

以下是一些要讨论的代码:

组态

@Configuration
public class PartitionersConfig {

    @Bean
    public ForkJoinPoolFactoryBean forkJoinPoolFactoryBean() {
        final ForkJoinPoolFactoryBean poolFactory = new ForkJoinPoolFactoryBean();
        return poolFactory;
    }
}
Run Code Online (Sandbox Code Playgroud)

服务

@Service
@Transactional
public class MyService {

    @Autowired
    private OtherService otherService;

    @Autowired
    private ForkJoinPool forkJoinPool;

    @Autowired
    private MyDao myDao;

    public void performPartitionedActionOnIds() {
        final ArrayList<UUID> ids = otherService.getIds();

        MyIdPartitioner task = new MyIdsPartitioner(ids, myDao, 0, ids.size() - 1);
        forkJoinPool.invoke(task);
    }
}
Run Code Online (Sandbox Code Playgroud)

存储库/ DAO

@Repository
@Transactional(propagation = Propagation.MANDATORY)
public class IdsDao {

    public MyData getData(List<UUID> list) {
        // ... 
    }
}
Run Code Online (Sandbox Code Playgroud)

RecursiveAction

public class MyIdsPartitioner extends RecursiveAction {

    private static final long serialVersionUID = 1L;
    private static final int THRESHOLD = 100;

    private ArrayList<UUID> ids;
    private int fromIndex;
    private int toIndex;

    private MyDao myDao;

    public MyIdsPartitioner(ArrayList<UUID> ids, MyDao myDao, int fromIndex, int toIndex) {
        this.ids = ids;
        this.fromIndex = fromIndex;
        this.toIndex = toIndex;
        this.myDao = myDao;
    }

    @Override
    protected void compute() {
        if (computationSetIsSamllEnough()) {
            computeDirectly();
        } else {
            int leftToIndex = fromIndex + (toIndex - fromIndex) / 2;
            MyIdsPartitioner leftPartitioner = new MyIdsPartitioner(ids, myDao, fromIndex, leftToIndex);
            MyIdsPartitioner rightPartitioner = new MyIdsPartitioner(ids, myDao, leftToIndex + 1, toIndex);

            invokeAll(leftPartitioner, rightPartitioner);
        }
    }

    private boolean computationSetIsSamllEnough() {
        return (toIndex - fromIndex) < THRESHOLD;
    }

    private void computeDirectly() {
        final List<UUID> subList = ids.subList(fromIndex, toIndex);
        final MyData myData = myDao.getData(sublist);
        modifyTheData(myData);
    }

    private void modifyTheData(MyData myData) {
        // ...
        // write to DB
    }
}
Run Code Online (Sandbox Code Playgroud)

执行此操作后,我得到:

没有找到标记为"强制"传播的交易的现有交易

我理解这是完全正常的,因为事务不会通过不同的线程传播.因此,一种解决方案是在另一个类似问题中提出的每个线程中手动创建事务.但这对我来说不够令人满意,所以我一直在寻找.

在Spring的论坛中,我找到了关于这个主题的讨论.我觉得非常有趣的一个段落:

"我可以想象一个人可以手动将事务上下文传播到另一个线程,但我认为你不应该真的尝试它.事务绑定到单个线程有一个原因 - 基本的底层资源 - jdbc连接 - 不是线程安全的.多个线程中的一个单一连接会破坏基本的jdbc请求/响应合同,如果它可以用于更多的简单示例,那将是一个小小的奇迹."

因此,第一个问题出现了:是否值得对数据库的读/写进行简化,这是否真的会损害数据库的一致性?
如果上面的引用不正确,我怀疑,是否有办法实现以下目标: MyIdPartitioner是Spring管理 - 使用@Scope("prototype") - 并传递所需的参数,以便递归调用它和那种方式将事务管理留给Spring?

nyx*_*yxz 1

经过进一步阅读,我设法解决了我的问题。有点(据我现在看来,一开始就没有问题)。

由于我从数据库中进行的读取是分块的,并且我确信在此期间结果不会被编辑,因此我可以在事务之外进行读取。

在我的情况下,写入也是安全的,因为我写入的所有值都是唯一的,并且不会发生违反约束的情况。所以我也从那里删除了交易。

我所说的“我删除了事务”只是在我的 DAO 中覆盖方法的传播模式,如下所示:

@Repository
@Transactional(propagation = Propagation.MANDATORY)
public class IdsDao {

    @Transactional(propagation = Propagation.SUPPORTS)
    public MyData getData(List<UUID> list) {
        // ... 
    }
}
Run Code Online (Sandbox Code Playgroud)

或者,如果您出于某种原因决定需要事务,那么您仍然可以通过将传播设置为 来将事务管理留给 Spring REQUIRED

所以解决方案比我想象的要简单得多。

并回答我的其他问题:

并行化数据库的读/写是否值得,这真的会损害数据库的一致性吗?

是的,这是值得的。只要每个线程都有事务,那就很酷。

有没有办法实现以下目标: MyIdPartitioner 由 Spring 管理 - 使用 @Scope("prototype") - 并将递归调用所需的参数传递给它,这样将事务管理留给 Spring ?

是的,有一种方法可以使用池(另一个 stackoverflow 问题)。或者,您可以将您的 bean 定义为,@Scope(value = "prototype", proxyMode = ScopedProxyMode.TARGET_CLASS)但如果您需要为其设置参数,它将无法工作,因为实例的每次使用都会为您提供一个新实例。前任。

@Autowire
MyIdsPartitioner partitioner;

public void someMethod() {
    ...
    partitioner.setIds(someIds);
    partitioner.setFromIndex(fromIndex);
    partitioner.setToIndex(toIndex);
    ...
}
Run Code Online (Sandbox Code Playgroud)

这将创建 3 个实例,并且您将无法使用该对象,因为不会设置字段。

简而言之 - 有一种方法,但我不需要一开始就去追求它。