如何使用WebClient限制请求/秒?

Pho*_*ste 4 spring project-reactor spring-webflux

我正在使用一个WebClient对象将Http Post请求发送到服务器。它正在非常迅速地发送大量请求(一个中大约有4000条消息QueueChannel)。问题是...似乎服务器无法足够快地响应...所以我收到很多服务器错误500,并且过早关闭了连接。

有没有一种方法可以限制每秒的请求数?还是限制它正在使用的线程数?

编辑:

Message端点在QueueChannel中处理消息:

@MessageEndpoint
public class CustomServiceActivator {

    private static final Logger logger = LogManager.getLogger();

    @Autowired
    IHttpService httpService;

    @ServiceActivator(
            inputChannel = "outputFilterChannel",
            outputChannel = "outputHttpServiceChannel",
            poller = @Poller( fixedDelay = "1000" )
    )
    public void processMessage(Data data) {
        httpService.push(data);
        try {
            Thread.sleep(20);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}
Run Code Online (Sandbox Code Playgroud)

WebClient服务类:

@Service
public class HttpService implements IHttpService {

    private static final String URL = "http://www.blabla.com/log";

    private static final Logger logger = LogManager.getLogger();

    @Autowired
    WebClient webClient;

    @Override
    public void push(Data data) {
        String body = constructString(data);
        Mono<ResponseEntity<Response>> res = webClient.post()
                .uri(URL + getLogType(data))
                .contentLength(body.length())
                .contentType(MediaType.APPLICATION_JSON)
                .syncBody(body)
                .exchange()
                .flatMap(response -> response.toEntity(Response.class));

        res.subscribe(new Consumer<ResponseEntity<Response>>() { ... });
    }
}
Run Code Online (Sandbox Code Playgroud)

Mar*_*nyi 7

Resilience4j 对Project Reactor的非阻塞速率限制有很好的支持。

所需的依赖项(除了 Spring WebFlux):

<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-reactor</artifactId>
    <version>1.6.1</version>
</dependency>
<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-ratelimiter</artifactId>
    <version>1.6.1</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)

例子:

import io.github.resilience4j.ratelimiter.RateLimiter;
import io.github.resilience4j.ratelimiter.RateLimiterConfig;
import io.github.resilience4j.reactor.ratelimiter.operator.RateLimiterOperator;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.concurrent.atomic.AtomicInteger;

public class WebClientRateLimit
{
    private static final AtomicInteger COUNTER = new AtomicInteger(0);

    private final WebClient webClient;
    private final RateLimiter rateLimiter;

    public WebClientRateLimit()
    {
        this.webClient = WebClient.create();

        // enables 3 requests every 5 seconds
        this.rateLimiter = RateLimiter.of("my-rate-limiter",
                RateLimiterConfig.custom()
                                 .limitRefreshPeriod(Duration.ofSeconds(5))
                                 .limitForPeriod(3)
                                 .timeoutDuration(Duration.ofMinutes(1)) // max wait time for a request, if reached then error
                                 .build());
    }

    public Mono<?> call()
    {
        return webClient.get()
                        .uri("https://jsonplaceholder.typicode.com/todos/1")
                        .retrieve()
                        .bodyToMono(String.class)
                        .doOnSubscribe(s -> System.out.println(COUNTER.incrementAndGet() + " - " + LocalDateTime.now()
                                + " - call triggered"))
                        .transformDeferred(RateLimiterOperator.of(rateLimiter));
    }

    public static void main(String[] args)
    {
        WebClientRateLimit webClientRateLimit = new WebClientRateLimit();

        long start = System.currentTimeMillis();

        Flux.range(1, 16)
            .flatMap(x -> webClientRateLimit.call())
            .blockLast();

        System.out.println("Elapsed time in seconds: " + (System.currentTimeMillis() - start) / 1000d);
    }
}
Run Code Online (Sandbox Code Playgroud)

示例输出:

1 - 2020-11-30T15:44:01.575003200 - call triggered
2 - 2020-11-30T15:44:01.821134 - call triggered
3 - 2020-11-30T15:44:01.823133100 - call triggered
4 - 2020-11-30T15:44:04.462353900 - call triggered
5 - 2020-11-30T15:44:04.462353900 - call triggered
6 - 2020-11-30T15:44:04.470399200 - call triggered
7 - 2020-11-30T15:44:09.461199100 - call triggered
8 - 2020-11-30T15:44:09.463157 - call triggered
9 - 2020-11-30T15:44:09.463157 - call triggered
11 - 2020-11-30T15:44:14.461447700 - call triggered
10 - 2020-11-30T15:44:14.461447700 - call triggered
12 - 2020-11-30T15:44:14.461447700 - call triggered
13 - 2020-11-30T15:44:19.462098200 - call triggered
14 - 2020-11-30T15:44:19.462098200 - call triggered
15 - 2020-11-30T15:44:19.468059700 - call triggered
16 - 2020-11-30T15:44:24.462615 - call triggered
Elapsed time in seconds: 25.096

Run Code Online (Sandbox Code Playgroud)

文档:https : //resilience4j.readme.io/docs/examples-1#decorate-mono-or-flux-with-a-ratelimiter


Bar*_*cki 5

使用Reactor的问题限制请求率提供了两个答案(一个在注释中)

zip与另一个充当速率限制器的助焊剂

.zipWith(Flux.interval(Duration.of(1,ChronoUnit.SECONDS)))

只是延迟每个Web请求

使用delayElements函数

编辑:下面的答案对于阻止RestTemplate是有效的,但并不是很适合反应模式。

WebClient没有限制请求的功能,但是您可以使用合成功能轻松添加此功能。

您可以在外部节制你的客户使用RateLimiter番石榴/(https://google.github.io/guava/releases/19.0/api/docs/index.html?com/google/common/util/concurrent/RateLimiter.html

在本教程http://www.baeldung.com/guava-rate-limiter中,您将找到如何以阻塞方式或超时使用Rate limiter。

我将装饰所有需要在单独的类中限制的调用

  1. 限制每秒的通话次数
  2. 使用WebClient执行实际的Web呼叫


ram*_*zag 5

我希望聚会没有迟到。不管怎样,限制请求的速率只是我一周前创建爬虫时遇到的问题之一。以下是问题:

  1. 我必须执行递归、分页的顺序请求。分页参数包含在我调用的 API 中。
  2. 收到响应后,暂停 1 秒,然后再执行下一个请求。
  3. 对于遇到的某些错误,请重试
  4. 重试时,暂停几秒钟

这是解决方案:

private Flux<HostListResponse> sequentialCrawl() {
    AtomicLong pageNo = new AtomicLong(2);
    // Solution for #1 - Flux.expand
    return getHosts(1)
        .doOnRequest(value -> LOGGER.info("Start crawling."))
        .expand(hostListResponse -> { 
            final long totalPages = hostListResponse.getData().getTotalPages();
            long currPageNo = pageNo.getAndIncrement();
            if (currPageNo <= totalPages) {
                LOGGER.info("Crawling page " + currPageNo + " of " + totalPages);
                // Solution for #2
                return Mono.just(1).delayElement(Duration.ofSeconds(1)).then(
                    getHosts(currPageNo)
                );
            }
            return Flux.empty();
        })
        .doOnComplete(() -> LOGGER.info("End of crawling."));
}

private Mono<HostListResponse> getHosts(long pageNo) {
    final String uri = hostListUrl + pageNo;
    LOGGER.info("Crawling " + uri);

    return webClient.get()
        .uri(uri)
        .exchange()
        // Solution for #3
        .retryWhen(companion -> companion
            .zipWith(Flux.range(1, RETRY + 1), (error, index) -> {
                String message = "Failed to crawl uri: " + error.getMessage();
                if (index <= RETRY && (error instanceof RequestIntervalTooShortException
                    || error instanceof ConnectTimeoutException
                    || "Connection reset by peer".equals(error.getMessage())
                )) {
                    LOGGER.info(message + ". Retries count: " + index);
                    return Tuples.of(error, index);
                } else {
                    LOGGER.warn(message);
                    throw Exceptions.propagate(error); //terminate the source with the 4th `onError`
                }
            })
            .map(tuple -> {
                // Solution for #4
                Throwable e = tuple.getT1();
                int delaySeconds = tuple.getT2();
                // TODO: Adjust these values according to your needs
                if (e instanceof ConnectTimeoutException) {
                    delaySeconds = delaySeconds * 5;
                } else if ("Connection reset by peer".equals(e.getMessage())) {
                    // The API that this app is calling will sometimes think that the requests are SPAM. So let's rest longer before retrying the request.
                    delaySeconds = delaySeconds * 10;
                }
                LOGGER.info("Will retry crawling after " + delaySeconds + " seconds to " + uri + ".");
                return Mono.delay(Duration.ofSeconds(delaySeconds));
            })
            .doOnNext(s -> LOGGER.warn("Request is too short - " + uri + ". Retried at " + LocalDateTime.now()))
        )
        .flatMap(clientResponse -> clientResponse.toEntity(String.class))
        .map(responseEntity -> {
            HttpStatus statusCode = responseEntity.getStatusCode();
            if (statusCode != HttpStatus.OK) {
                Throwable exception;
                // Convert json string to Java POJO
                HostListResponse response = toHostListResponse(uri, statusCode, responseEntity.getBody());
                // The API that I'm calling will return error code of 06 if request interval is too short
                if (statusCode == HttpStatus.BAD_REQUEST && "06".equals(response.getError().getCode())) {
                    exception = new RequestIntervalTooShortException(uri);
                } else {
                    exception = new IllegalStateException("Request to " + uri + " failed. Reason: " + responseEntity.getBody());
                }
                throw Exceptions.propagate(exception);
            } else {
                return toHostListResponse(uri, statusCode, responseEntity.getBody());
            }
        });
}
Run Code Online (Sandbox Code Playgroud)


归档时间:

查看次数:

3013 次

最近记录:

6 年,3 月 前