带反压的反应式 SQS 轮询器

use*_*815 6 java amazon-sqs project-reactor

尝试创建 SQS 轮询器:

  • 进行指数轮询(如果队列中没有消息,则减少请求数量)
  • 如果队列中有很多消息,则更频繁地查询 SQS
  • 如果收到一定数量的消息有背压,它会停止轮询
  • 不受 AWS API 速率限制的限制

作为一个例子,我正在使用这个JavaRx 实现,它很容易转换为 Project Reactor 并通过背压对其进行丰富。

private static final Long DEFAULT_BACKOFF = 500L;
private static final Long MAX_BACKOFF = 8000L;
private static final Logger LOGGER = LoggerFactory.getLogger(SqsPollerService.class);
private static volatile boolean stopRequested;

public Flux<Message> pollMessages(GetQueueUrlResult q)
{
    return Flux.create(sink -> {
        long backoff = DEFAULT_BACKOFF;

        while (!stopRequested)
        {
            if (sink.isCancelled())
            {
                sink.error(new RuntimeException("Stop requested"));
                break;
            }

            Future<ReceiveMessageResult> future = sink.requestedFromDownstream() > 0
                    ? amazonSQS.receiveMessageAsync(createRequest(q))
                    : completedFuture(new ReceiveMessageResult());

            try
            {
                ReceiveMessageResult result = future.get();

                if (result != null && !result.getMessages().isEmpty())
                {
                    backoff = DEFAULT_BACKOFF;

                    LOGGER.info("New messages found in queue size={}", result.getMessages().size());

                    result.getMessages().forEach(m -> {
                        if (sink.requestedFromDownstream() > 0L)
                        {
                            sink.next(m);
                        }
                    });
                }
                else
                {
                    if (backoff < MAX_BACKOFF)
                    {
                        backoff = backoff * 2;
                    }

                    LOGGER.debug("No messages found on queue.  Sleeping for {} ms.", backoff);

                    // This is to prevent rate limiting by the AWS api
                    Thread.sleep(backoff);
                }
            }
            catch (InterruptedException e)
            {
                stopRequested = true;
            }
            catch (ExecutionException e)
            {
                sink.error(e);
            }
        }
    });
}
Run Code Online (Sandbox Code Playgroud)

实施似乎有效,但有几个问题:

  • 看起来可以使用 Reactor Primitives 在循环中查询 Future 结果,尝试过Flux.generate但无法控制对 SqsClient 进行的异步调用次数
  • 如果Flux.interval方法不了解如何正确实施退避政策
  • 不喜欢Thread.sleep调用任何想法如何替换它?
  • 如果取消信号,如何正确停止循环?现在使用 Usingsink.error来涵盖这种情况。

use*_*190 1

您对以下解决方案有何看法:

    private static final Integer batchSize = 1;
    private static final Integer intervalRequest = 3000;
    private static final Integer waitTimeout = 10;
    private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

    private static final SqsAsyncClient sqsAsync =
       SqsAsyncClient
         .builder()
         .endpointOverride(URI.create(queueUrl))
         .build();

    public static Flux<Message> sqsPublisher =
        Flux.create(sink -> {
                if (sink.isCancelled()) {
                    sink.error(new RuntimeException("Stop requested"));
                }

            scheduler.scheduleWithFixedDelay(() -> {
                long numberOfRequests = Math.min(sink.requestedFromDownstream(), batchSize);
                if (numberOfRequests > 0) {
                    ReceiveMessageRequest request = ReceiveMessageRequest
                            .builder()
                            .queueUrl(queueUrl)
                            .maxNumberOfMessages((int) numberOfRequests)
                            .waitTimeSeconds(waitTimeout).build();

                    CompletableFuture<ReceiveMessageResponse> response = sqsAsync.receiveMessage(request);

                    response.thenApply(responseValue -> {
                        if (responseValue != null && responseValue.messages() != null && !responseValue.messages().isEmpty()) {
                            responseValue.messages().stream().limit(numberOfRequests).forEach(sink::next);
                        }
                        return responseValue;
                    });

                }
            }, intervalRequest, intervalRequest, TimeUnit.MILLISECONDS);
        });
Run Code Online (Sandbox Code Playgroud)