如何在Spring Reactor Web应用程序中执行操作序列并确保一个操作在下一个操作之前完成?

ace*_*ace 20 spring spring-boot project-reactor spring-webflux

我有Spring Boot 2网络应用程序,我需要通过cookie识别网站访问者并收集页面查看统计信息.所以我需要拦截每个Web请求.我必须编写的代码比回调地狱更复杂(Spring反应堆应该解决的问题).

这是代码:

package mypack.conf;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.data.mongodb.repository.config.EnableReactiveMongoRepositories;
import org.springframework.http.HttpCookie;
import org.springframework.http.ResponseCookie;
import org.springframework.web.reactive.config.ResourceHandlerRegistry;
import org.springframework.web.reactive.config.WebFluxConfigurer;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;

import mypack.dao.PageViewRepository;
import mypack.dao.UserRepository;
import mypack.domain.PageView;
import mypack.domain.User;
import mypack.security.JwtProvider;

import reactor.core.publisher.Mono;
@Configuration


@ComponentScan(basePackages = "mypack")
@EnableReactiveMongoRepositories(basePackages = "mypack")
public class WebConfig implements WebFluxConfigurer {

    @Autowired
    @Lazy
    private UserRepository userRepository;

    @Autowired
    @Lazy
    private PageViewRepository pageViewRepository;


    @Autowired
    @Lazy
    JwtProvider jwtProvider;


    @Bean
    public WebFilter sampleWebFilter()  {
        return new WebFilter() {

            @Override
            public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {

                String uri = exchange.getRequest().getURI().toString();
                String path = exchange.getRequest().getPath().pathWithinApplication().value();


                HttpCookie  cookie = null;
                String token = "";
                Map<String, List<HttpCookie>> cookies = exchange.getRequest().getCookies();


                try {
                    if((exchange.getRequest().getCookies().containsKey("_token") )
                            &&  (exchange.getRequest().getCookies().getFirst("_token"))!=null  ) {

                        cookie = exchange.getRequest().getCookies().getFirst("_token");
                        token = cookie.getValue();


                        return userRepository.findByToken(token).map(user -> {

                                exchange.getAttributes().put("_token", user.getToken());


                                PageView pg = PageView.builder().createdDate(LocalDateTime.now()).URL(uri).build();
                                pageViewRepository.save(pg).subscribe(pg1 -> {user.getPageviews().add(pg1); });

                                userRepository.save(user).subscribe();
                                    return user;
                            })


                            .flatMap(user-> chain.filter(exchange)); // ultimately this step executes regardless user exist or not

                    // handle case when brand new user first time visits website    
                    } else {
                        token = jwtProvider.genToken("guest", UUID.randomUUID().toString());
                        User user = User.builder().createdDate(LocalDateTime.now()).token(token).emailId("guest").build();
                        userRepository.save(user).subscribe();
                        exchange.getResponse().getCookies().remove("_token");

                        ResponseCookie rcookie  = ResponseCookie.from("_token", token).httpOnly(true).build();
                        exchange.getResponse().addCookie(rcookie);
                        exchange.getAttributes().put("_token", token);

                    }

                } catch (Exception e) {

                    e.printStackTrace();
                }



                return chain.filter(exchange);
            } // end of  Mono<Void> filter method
        }; // end of New WebFilter (anonymous class)
    }

}
Run Code Online (Sandbox Code Playgroud)

其他相关课程:

@Repository
public interface PageViewRepository extends   ReactiveMongoRepository<PageView, String>{

    Mono<PageView> findById(String id);

}


@Repository
public interface UserRepository extends   ReactiveMongoRepository<User, String>{

    Mono<User> findByToken(String token);

}





@Data
@AllArgsConstructor
@Builder
@NoArgsConstructor
public class User {

    @Id
    private String id;
    private String token;


    @Default
    private LocalDateTime createdDate = LocalDateTime.now();

    @DBRef
    private List<PageView> pageviews;

}



Data
@Document
@Builder
public class PageView {
    @Id
    private String id;

    private String URL;

    @Default
    private LocalDateTime createdDate = LocalDateTime.now();
}
Run Code Online (Sandbox Code Playgroud)

gradle文件的相关部分:

buildscript {
    ext {

        springBootVersion = '2.0.1.RELEASE'
    }
    repositories {
        mavenCentral()

    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}

dependencies {

    compile('org.springframework.boot:spring-boot-starter-data-mongodb-reactive')
    compile('org.springframework.boot:spring-boot-starter-security')
    compile('org.springframework.boot:spring-boot-starter-thymeleaf')

    compile('org.springframework.boot:spring-boot-starter-webflux')

    compile('org.springframework.security:spring-security-oauth2-client')
    compile('org.springframework.security.oauth:spring-security-oauth2:2.3.4.RELEASE')
    runtime('org.springframework.boot:spring-boot-devtools')
    compileOnly('org.projectlombok:lombok')
    compile "org.springframework.security:spring-security-jwt:1.0.9.RELEASE"
    compile "io.jsonwebtoken:jjwt:0.9.0"

    testCompile('org.springframework.boot:spring-boot-starter-test')

    testCompile('io.projectreactor:reactor-test')

    compile('com.fasterxml.jackson.core:jackson-databind')
}
Run Code Online (Sandbox Code Playgroud)

问题在于以下几点:

PageView pg = PageView.builder().createdDate(LocalDateTime.now()).URL(uri).build(); pageViewRepository.save(pg).subscribe(pg1 - > {user.getPageviews().add(pg1);});

它挂起浏览器(等待响应).

基本上我想要的是:不能使用block(),它甚至不能在webfilter代码中工作,因为块也会挂起浏览器.在mongo db中保存网页浏览量.保存后,pageview具有有效的mongodb id,需要在页面视图中存储用户实体列表.因此,只有在将其保存在db中之后,下一步才是更新用户的综合浏览量列表.下一步是保存更新的用户,而不影响下游控制器方法,这些方法也可能更新用户,也可能需要保存用户.所有这些都应该在给定的WebFilter上下文中起作用.

如何解决这个问题呢?

提供的解决方案必须确保用户在传递给控制器​​操作之前保存在webfilter中,其中一些操作还可以使用查询字符串参数的不同值保存用户.

Cep*_*pr0 13

如果我理解正确,您需要异步执行数据库的长时间操作,以防止过滤器(和请求本身)阻塞?

在这种情况下,我会推荐以下对我有用的解决方案:

@Bean
public WebFilter filter() {
    return (exchange, chain) -> {
        ServerHttpRequest req = exchange.getRequest();
        String uri = req.getURI().toString();
        log.info("[i] Got request: {}", uri);

        var headers = req.getHeaders();
        List<String> tokenList = headers.get("token");

        if (tokenList != null && tokenList.get(0) != null) {
            String token = tokenList.get(0);
            log.info("[i] Find a user by token {}", token);
            return userRepo.findByToken(token)
                    .map(user -> process(exchange, uri, token, user))
                    .then(chain.filter(exchange));
        } else {
            String token = UUID.randomUUID().toString();
            log.info("[i] Create a new user with token {}", token);
            return userRepo.save(new User(token))
                    .map(user -> process(exchange, uri, token, user))
                    .then(chain.filter(exchange));
        }
    };
}
Run Code Online (Sandbox Code Playgroud)

在这里,我略微更改您的逻辑并从适当的标头(而不是Cookie)中获取标记值,以简化我的实现.

因此,如果令牌存在,那么我们尝试找到它的用户.如果令牌不存在,那么我们创建一个新用户.如果成功找到或创建了用户,则该process方法正在调用.在那之后,无论结果如何,我们都会回来chain.filter(exchange).

该方法process使一个令牌值于该请求和呼叫异步方法的适当属性updateUserStatuserService:

private User process(ServerWebExchange exchange, String uri, String token, User user) {
    exchange.getAttributes().put("_token", token);
    userService.updateUserStat(uri, user); // async call
    return user;
}
Run Code Online (Sandbox Code Playgroud)

用户服务:

@Slf4j
@Service
public class UserService {

    private final UserRepo userRepo;
    private final PageViewRepo pageViewRepo;

    public UserService(UserRepo userRepo, PageViewRepo pageViewRepo) {
        this.userRepo = userRepo;
        this.pageViewRepo = pageViewRepo;
    }

    @SneakyThrows
    @Async
    public void updateUserStat(String uri, User user) {
        log.info("[i] Start updating...");
        Thread.sleep(1000);
        pageViewRepo.save(new PageView(uri))
                .flatMap(user::addPageView)
                .blockOptional()
                .ifPresent(u -> userRepo.save(u).block());
        log.info("[i] User updated.");
    }
}
Run Code Online (Sandbox Code Playgroud)

为了测试目的,我在这里添加了一个小延迟,以确保请求无延迟地工作,无论此方法的持续时间如何.

令牌找到用户的情况:

2019-01-06 18:25:15.442  INFO 4992 --- [ctor-http-nio-3] : [i] Got request: http://localhost:8080/users?test=1000
2019-01-06 18:25:15.443  INFO 4992 --- [ctor-http-nio-3] : [i] Find a user by token 84b0f7ec-670c-4c04-8a7c-b692752d7cfa
2019-01-06 18:25:15.444 DEBUG 4992 --- [ctor-http-nio-3] : Created query Query: { "token" : "84b0f7ec-670c-4c04-8a7c-b692752d7cfa" }, Fields: { }, Sort: { }
2019-01-06 18:25:15.445 DEBUG 4992 --- [ctor-http-nio-3] : find using query: { "token" : "84b0f7ec-670c-4c04-8a7c-b692752d7cfa" } fields: Document{{}} for class: class User in collection: user
2019-01-06 18:25:15.457  INFO 4992 --- [ntLoopGroup-2-2] : [i] Get all users...
2019-01-06 18:25:15.457  INFO 4992 --- [         task-3] : [i] Start updating...
2019-01-06 18:25:15.458 DEBUG 4992 --- [ntLoopGroup-2-2] : find using query: { } fields: Document{{}} for class: class User in collection: user
2019-01-06 18:25:16.459 DEBUG 4992 --- [         task-3] : Inserting Document containing fields: [URL, createdDate, _class] in collection: pageView
2019-01-06 18:25:16.476 DEBUG 4992 --- [         task-3] : Saving Document containing fields: [_id, token, pageViews, _class]
2019-01-06 18:25:16.479  INFO 4992 --- [         task-3] : [i] User updated.
Run Code Online (Sandbox Code Playgroud)

在这里我们可以看到,task-3在用户已经获得"获取所有用户"请求的结果后,在独立线程中执行更新用户.

令牌不存在且创建用户的情况:

2019-01-06 18:33:54.764  INFO 4992 --- [ctor-http-nio-3] : [i] Got request: http://localhost:8080/users?test=763
2019-01-06 18:33:54.764  INFO 4992 --- [ctor-http-nio-3] : [i] Create a new user with token d9bd40ea-b869-49c2-940e-83f1bf79e922
2019-01-06 18:33:54.765 DEBUG 4992 --- [ctor-http-nio-3] : Inserting Document containing fields: [token, _class] in collection: user
2019-01-06 18:33:54.776  INFO 4992 --- [ntLoopGroup-2-2] : [i] Get all users...
2019-01-06 18:33:54.777  INFO 4992 --- [         task-4] : [i] Start updating...
2019-01-06 18:33:54.777 DEBUG 4992 --- [ntLoopGroup-2-2] : find using query: { } fields: Document{{}} for class: class User in collection: user
2019-01-06 18:33:55.778 DEBUG 4992 --- [         task-4] : Inserting Document containing fields: [URL, createdDate, _class] in collection: pageView
2019-01-06 18:33:55.792 DEBUG 4992 --- [         task-4] : Saving Document containing fields: [_id, token, pageViews, _class]
2019-01-06 18:33:55.795  INFO 4992 --- [         task-4] : [i] User updated.
Run Code Online (Sandbox Code Playgroud)

令牌存在但未找到用户的情况:

2019-01-06 18:35:40.970  INFO 4992 --- [ctor-http-nio-3] : [i] Got request: http://localhost:8080/users?test=150
2019-01-06 18:35:40.970  INFO 4992 --- [ctor-http-nio-3] : [i] Find a user by token 184b0f7ec-670c-4c04-8a7c-b692752d7cfa
2019-01-06 18:35:40.972 DEBUG 4992 --- [ctor-http-nio-3] : Created query Query: { "token" : "184b0f7ec-670c-4c04-8a7c-b692752d7cfa" }, Fields: { }, Sort: { }
2019-01-06 18:35:40.972 DEBUG 4992 --- [ctor-http-nio-3] : find using query: { "token" : "184b0f7ec-670c-4c04-8a7c-b692752d7cfa" } fields: Document{{}} for class: class User in collection: user
2019-01-06 18:35:40.977  INFO 4992 --- [ntLoopGroup-2-2] : [i] Get all users...
2019-01-06 18:35:40.978 DEBUG 4992 --- [ntLoopGroup-2-2] : find using query: { } fields: Document{{}} for class: class User in collection: user
Run Code Online (Sandbox Code Playgroud)

我的演示项目:sb-reactive-filter-demo


小智 0

为了收集页面浏览统计数据,我建议更改策略并使用 Actuator 和 Micrometer 代替:

  1. 将执行器依赖项添加到您的项目中
  2. 公开相关端点(此处,metrics
  3. 转至/actuator/metrics服务器 HTTP 请求并选择指标(请参阅参考文档)。

Micrometer 提供了更多功能,可帮助您获得正确的指标,例如:在测量时间时考虑 GC 暂停、提供直方图/百分位数/...等等。