ody*_*our 9 java spring transactions reactive-programming rx-java
我正在使用RxJava 1.1从Spring应用程序内部组成一个可观察的序列,如下所示:
@Transaction
public Observable<Event> create(Event event) {
return Observable.just(event)
.flatMap(event -> {
//save event to db (blocking JPA operation)
Event event = eventRepository.save(event);
return Observable.just(event);
})
//async REST call to service A
.flatMap(this::sendEventToServiceA) <---- may execute on different thread
//async REST call to service B
.flatMap(this::sendEventToServiceB) <---- may execute on different thread
.doOnError( throwable -> {
// ? rollback initally created transaction?
})
}
Run Code Online (Sandbox Code Playgroud)
事件从某个控制器类到达我的应用程序的服务层,并通过使用RxJava的flatMap()函数构建的一系列操作进行传播.该事件首先存储在数据库(Spring Data)中,然后使用Spring的AsyncRestTemplate库在后台依次执行下两个异步HTTP请求.
如果在管道中的任何地方抛出错误/异常,我希望能够回滚数据库事务,以便事件不存储在数据库中.我发现这并不容易,因为在Spring中,事务上下文与特定的执行线程相关联.因此,如果代码在另一个线程上到达doOnError回调(AsyncRestTemplate使用自己的AsyncTaskExecutor),则无法回滚最初创建的事务.
您能否建议任何机制在多线程应用程序中实现事务,这些应用程序由以这种方式编写的多个异步操作组成?
我还试图以编程方式创建一个事务:
TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition());
Run Code Online (Sandbox Code Playgroud)
然后在整个管道中发送transactionStatus对象和事件,但是当发生错误并且我调用"platformTransactionManager.rollback(status);"时,我得到"事务同步不活动",因为它在另一个线程上运行我猜.
ps sendEventToServiceA/sendEventToServiceB方法看起来类似于:
public Observable<Event> sendEventToServiceA(event) {
..........
ListenableFuture<ResponseEntity<String>> listenableFuture = asyncRestTemplate.exchange(
"/serviceA/create?event_id=" + event.id,
HttpMethod.POST, requestEntity, String.class);
return ObservableUtil.toRxObservable(listenableFuture);
}
Run Code Online (Sandbox Code Playgroud)
一种方法是确保在与db保存相同的线程上观察到错误:
@Transaction
public Observable<Event> create(Event event) {
Scheduler scheduler = Schedulers.from(Executors.newSingleThreadExecutor());
return Observable.just(event)
.flatMap(event -> {
//save event to db (blocking JPA operation)
Event event = eventRepository.save(event);
return Observable.just(event);
})
.subscribeOn(scheduler)
//async REST call to service A
.flatMap(this::sendEventToServiceA) <---- may execute on different thread
//async REST call to service B
.flatMap(this::sendEventToServiceB) <---- may execute on different thread
.observeOn(scheduler)
.doOnError( throwable -> {
// ? rollback initally created transaction?
})
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2410 次 |
| 最近记录: |