使用 Spring Data MongoDB 在事务中调用两个不同 ReactiveMongoRepository 中的方法?

Joh*_*han 4 java spring mongodb spring-boot spring-data-mongodb-reactive

在 Spring Data MongoDB 中使用反应式编程模型时,可以执行如下事务:

Mono<DeleteResult> result = template.inTransaction()                                      
    .execute(action -> action.remove(query(where("id").is("step-1")), Step.class)); 
Run Code Online (Sandbox Code Playgroud)

但是 Spring Data MongoDB 也支持“reactive repositories”,例如:

public interface PersonRepository extends ReactiveMongoRepository<Person, String>

  Flux<Person> findByLocationNear(Point location, Distance distance);
}
Run Code Online (Sandbox Code Playgroud)

public interface CarRepository extends ReactiveMongoRepository<Car, String>

  Flux<Car> findByYear(int year);
}
Run Code Online (Sandbox Code Playgroud)

我的问题是,鉴于你有ReactiveMongoRepository's,你能以某种方式利用 MongoDB 事务,例如在同一个事务中插入 aPerson和吗Car(在这种情况下使用PersonRepositoryCarRepository)?如果是这样,你如何做到这一点?

kak*_*ali 8

我也一直在努力为Mongo DB 和 Spring Boot 的 Reactive 风格的事务支持找到解决方案

但幸运的是我自己想通了。虽然来自谷歌的东西很少也有帮助,但那些是非反应性的。

重要说明- 对于 Spring boot 2.2.x,它运行良好,但对于 spring boot 2.3.x,它还有一些其他问题,它有内部重写和更改

  • 您需要使用ReactiveMongoTransactionManagerReactiveMongoDatabaseFactory,大部分细节在最后,还共享代码仓库

  • 为了让 mongo db 支持事务,我们需要确保数据库应该在副本模式下运行

    为什么我们需要那个?因为否则你会得到一些这样的错误:-

    此客户端连接到的 MongoDB 集群不支持会话

相同的说明如下:-

  1. 使用如下共享的 docker-compose.yml 运行基于 docker-compose 的 mongo db 服务器:-
version: "3"
services:
    mongo:
        hostname: mongo
        container_name: localmongo_docker
        image: mongo
        expose:
          - 27017
        ports:
          - 27017:27017
        restart: always
        entrypoint: [ "/usr/bin/mongod", "--bind_ip_all", "--replSet", "rs0" ]
        volumes:
          - ./mongodata:/data/db # need to create a docker volume named as mongodata first

Run Code Online (Sandbox Code Playgroud)
  1. 镜像出来后,执行命令(这里localmongo_docker是容器的名字):-
docker exec -it localmongo_docker mongo
Run Code Online (Sandbox Code Playgroud)
  1. 复制并粘贴下面的命令并执行
rs.initiate(
   {
     _id : 'rs0',
     members: [
       { _id : 0, host : "mongo:27017" }
     ]
   }
 )
Run Code Online (Sandbox Code Playgroud)
  1. 然后输入exit退出执行

重要- 代码仓库可以在我的 github 上找到 - https://github.com/krnbr/mongo-spring-boot-template

代码的重要说明如下:-

  • 配置包中的MongoConfiguration类是使事务正常工作的重要部分,配置类的链接在这里

  • 主要部分是Bean

     @Bean
     ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory dbFactory) {
         return new ReactiveMongoTransactionManager(dbFactory);
     }
    
    Run Code Online (Sandbox Code Playgroud)
  • 要检查代码的事务性要求的工作情况,您可以在此处查看服务包中的 UserService 类

如果链接对某人不起作用,则共享代码:-

配置和 Beans 内部

@Configuration
public class MongoConfiguration extends AbstractMongoClientConfiguration {

    @Autowired
    private MongoProperties mongoProperties;

    @Bean
    ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory dbFactory) {
        return new ReactiveMongoTransactionManager(dbFactory);
    }

    @Override
    protected String getDatabaseName() {
        return mongoProperties.getDatabase();
    }

    @Override
    public MongoClient mongoClient() {
        return MongoClients.create(mongoProperties.getUri());
    }
}
Run Code Online (Sandbox Code Playgroud)

application.properties(与mongo db相关)

spring.data.mongodb.database=mongo
spring.data.mongodb.uri=mongodb://localhost:27017/mongo?replicaSet=rs0
Run Code Online (Sandbox Code Playgroud)

文档类

角色等级

@Getter
@Setter
@Accessors(chain = true)
@Document(collection = "roles")
@TypeAlias("role")
public class Role implements Persistable<String> {

    @Id
    private String id;

    @Field("role_name")
    @Indexed(unique = true)
    private String role;

    @CreatedDate
    private ZonedDateTime created;

    @LastModifiedDate
    private ZonedDateTime updated;

    private Boolean deleted;

    private Boolean enabled;

    @Override
    @JsonIgnore
    public boolean isNew() {
        if(getCreated() == null)
            return true;
        else
            return false;
    }
}
Run Code Online (Sandbox Code Playgroud)

用户类

@Getter
@Setter
@Accessors(chain = true)
@Document(collection = "users")
@JsonInclude(JsonInclude.Include.NON_NULL)
@TypeAlias("user")
public class User implements Persistable<String> {

    @Id()
    private String id;

    @Field("username")
    @Indexed(unique = true)
    @JsonProperty("username")
    private String userName;

    @JsonProperty(access = JsonProperty.Access.WRITE_ONLY)
    private String password;

    @CreatedDate
    private ZonedDateTime created;

    @LastModifiedDate
    private ZonedDateTime updated;

    private Boolean deleted;

    private Boolean enabled;

    @DBRef(lazy = true)
    @JsonProperty("roles")
    private List<Role> roles = new ArrayList();

    @Override
    @JsonIgnore
    public boolean isNew() {
        if(getCreated() == null)
            return true;
        else
            return false;
    }
}
Run Code Online (Sandbox Code Playgroud)

用户配置文件类

@Getter
@Setter
@Accessors(chain = true)
@Document(collection = "user_profiles")
@JsonInclude(JsonInclude.Include.NON_NULL)
@TypeAlias("user_profile")
public class UserProfile implements Persistable<String> {

    @Id
    private String id;

    @Indexed(unique = true)
    private String mobile;

    @Indexed(unique = true)
    private String email;

    private String address;

    private String firstName;

    private String lastName;

    @DBRef
    private User user;

    @CreatedDate
    private ZonedDateTime created;

    @LastModifiedDate
    private ZonedDateTime updated;

    private Boolean deleted;

    private Boolean enabled;

    @Override
    @JsonIgnore
    public boolean isNew() {
        if(getCreated() == null)
            return true;
        else
            return false;
    }

}
Run Code Online (Sandbox Code Playgroud)

ReactiveMongoRepository 接口

角色库

public interface RoleRepository extends ReactiveMongoRepository<Role, String> {

    Mono<Role> findByRole(String role);

    Flux<Role> findAllByRoleIn(List<String> roles);

}
Run Code Online (Sandbox Code Playgroud)

用户库

public interface UserRepository extends ReactiveMongoRepository<User, String> {

    Mono<User> findByUserName(String userName);

}
Run Code Online (Sandbox Code Playgroud)

用户档案库

public interface UserProfileRepository extends ReactiveMongoRepository<UserProfile, String> {
}
Run Code Online (Sandbox Code Playgroud)

User Service Class 这里需要创建自己的RuntimeException Class,这里是AppRuntimeException Class,我一直在用

@Slf4j
@Service
public class UserService {

    @Autowired
    private RoleRepository roleRepository;

    @Autowired
    private UserRepository userRepository;

    @Autowired
    private UserProfileRepository userProfileRepository;

    @Transactional
    public Mono<UserProfile> saveUserAndItsProfile(final UserRequest userRequest) {

        Mono<Role> roleMono = roleRepository.findByRole("USER");

        Mono<User> userMono = roleMono.flatMap(r -> {
            User user = new User()
                    .setUserName(userRequest.getUsername())
                    .setPassword(userRequest.getPassword());
            user.setRoles(Arrays.asList(r));
            return userRepository.save(user);
        }).onErrorResume(ex -> {
            log.error(ex.getMessage());
            if(ex instanceof DuplicateKeyException) {
                String errorMessage = "The user with the username '"+userRequest.getUsername()+"' already exists";
                log.error(errorMessage);
                return Mono.error(new AppRuntimeException(errorMessage, ErrorCodes.CONFLICT, ex));
            }
            return Mono.error(new AppRuntimeException(ex.getMessage(), ErrorCodes.INTERNAL_SERVER_ERROR, ex));
        });

        Mono<UserProfile> userProfileMono = userMono.flatMap(u -> {
            UserProfile userProfile = new UserProfile()
                    .setAddress(userRequest.getAddress())
                    .setEmail(userRequest.getEmail())
                    .setMobile(userRequest.getMobile())
                    .setUser(u);
            return userProfileRepository.save(userProfile);
        }).onErrorResume(ex -> {
            log.error(ex.getMessage());
            if(ex instanceof DuplicateKeyException) {
                String errorMessage = "The user with the profile mobile'"+userRequest.getMobile()+"' and/or - email '"+userRequest.getEmail()+"' already exists";
                log.error(errorMessage);
                return Mono.error(new AppRuntimeException(errorMessage, ErrorCodes.CONFLICT, ex));
            }
            return Mono.error(new AppRuntimeException(ex.getMessage(), ErrorCodes.INTERNAL_SERVER_ERROR, ex));
        });

        return userProfileMono;

    }

}
Run Code Online (Sandbox Code Playgroud)

控制器和模型类

用户请求模型类

@Getter
@Setter
@Accessors(chain = true)
@Slf4j
@JsonInclude(JsonInclude.Include.NON_NULL)
public class UserRequest {

    private String username;
    private String password;
    private String mobile;
    private String email;
    private String address;
    private String firstName;
    private String lastName;

}
Run Code Online (Sandbox Code Playgroud)

UserProfileApisController

@Slf4j
@RestController
@RequestMapping("/apis/user/profile")
public class UserProfileApisController {

    @Autowired
    private UserService userService;

    @PostMapping
    public Mono<UserProfile> saveUserProfile(final @RequestBody UserRequest userRequest) {
        return userService.saveUserAndItsProfile(userRequest);
    }

}
Run Code Online (Sandbox Code Playgroud)