Java:多线程内的 XA 事务传播

Dre*_*rew 5 java jboss transactions distributed-transactions atomikos

如何在 Java SE(不是 Java EE 或 Spring)中使用事务管理器(例如BitronixJBoss TSAtomikos)来支持以下用例:

假设我们有以下类:

public class Dao {

    public void updateDatabase(DB db) {
        connet to db
        run a sql
    }

}
Run Code Online (Sandbox Code Playgroud)

我们从中创建一个 Java Runnable,如下所示:

public class MyRunnable extends Runnable {

    Dao dao;
    DB db;

    public MyRunnable(Dao dao, DB db) {
        this.dao=dao;
        this.db = db;
    }           

    public run() throws Exception {
        return dao.updateDatabase(db);
    }
}
Run Code Online (Sandbox Code Playgroud)

现在在我们的服务层中,我们有另一个类:

public class Service {

    public void updateDatabases() {

        BEGIN TRANSACTION;

        ExecutorService es = Executors.newFixedThreadPool(10);

        ExecutorCompletionService ecs = new ExecutorCompletionService(es);

        List<Future<T>> futures = new ArrayList<Future<T>>(n);

        Dao dao = new Dao();

        futures.add(ecs.submit(new MyRunnable(dao, new DB("db1")));
        futures.add(ecs.submit(new MyRunnable(dao, new DB("db2")));
        futures.add(ecs.submit(new MyRunnable(dao, new DB("db3")));

        for (int i = 0; i < n; ++i) {
            completionService.take().get();
        }

       END TRANSACTION;
    }

}
Run Code Online (Sandbox Code Playgroud)

客户端可以是 Servlet 或任何其他多线程环境:

public MyServlet extend HttpServlet {

    protected void service(final HttpServletRequest request, final HttpServletResponse response) throws IOException {

        Service service = new Service();

        service.updateDatabases();

    }

}
Run Code Online (Sandbox Code Playgroud)

BEGIN TRANSACTION 和 END TRANSACTION 部分的正确代码是什么?这可行吗?如果不是,需要改变什么?要求是保持 updateDatabases() 方法并发(因为它将同时访问多个数据库)和事务性。

Dre*_*rew 4

似乎这可以使用Atomikos使用SubTxThread来完成

//first start a tx
TransactionManager tm = ...
tm.begin();

Waiter waiter = new Waiter();

//the code that calls the first EIS; defined by you
SubTxCode code1 = ...

//the associated thread
SubTxThread thread1 = new SubTxThread ( waiter , code1 );

//the code that calls the second EIS; defined by you
SubTxCode code2 = ...

//the associated thread
SubTxThread thread2 = new SubTxThread ( waiter , code2 );

//start each thread
thread1.start();

thread2.start();

//wait for completion of all calls
waiter.waitForAll();

//check result
if ( waiter.getAbortCount() == 0 ) {
    //no failures -> commit tx
    tm.commit();
} else {
    tm.rollback();
}
Run Code Online (Sandbox Code Playgroud)