use*_*815 6 java amazon-sqs project-reactor
尝试创建 SQS 轮询器:
作为一个例子,我正在使用这个JavaRx 实现,它很容易转换为 Project Reactor 并通过背压对其进行丰富。
private static final Long DEFAULT_BACKOFF = 500L;
private static final Long MAX_BACKOFF = 8000L;
private static final Logger LOGGER = LoggerFactory.getLogger(SqsPollerService.class);
private static volatile boolean stopRequested;
public Flux<Message> pollMessages(GetQueueUrlResult q)
{
return Flux.create(sink -> {
long backoff = DEFAULT_BACKOFF;
while (!stopRequested)
{
if (sink.isCancelled())
{
sink.error(new RuntimeException("Stop requested"));
break;
}
Future<ReceiveMessageResult> future = sink.requestedFromDownstream() > 0
? amazonSQS.receiveMessageAsync(createRequest(q))
: completedFuture(new ReceiveMessageResult());
try
{
ReceiveMessageResult result = future.get();
if (result != null && !result.getMessages().isEmpty())
{
backoff = DEFAULT_BACKOFF;
LOGGER.info("New messages found in queue size={}", result.getMessages().size());
result.getMessages().forEach(m -> {
if (sink.requestedFromDownstream() > 0L)
{
sink.next(m);
}
});
}
else
{
if (backoff < MAX_BACKOFF)
{
backoff = backoff * 2;
}
LOGGER.debug("No messages found on queue. Sleeping for {} ms.", backoff);
// This is to prevent rate limiting by the AWS api
Thread.sleep(backoff);
}
}
catch (InterruptedException e)
{
stopRequested = true;
}
catch (ExecutionException e)
{
sink.error(e);
}
}
});
}
Run Code Online (Sandbox Code Playgroud)
实施似乎有效,但有几个问题:
Flux.generate但无法控制对 SqsClient 进行的异步调用次数Flux.interval方法不了解如何正确实施退避政策Thread.sleep调用任何想法如何替换它?sink.error来涵盖这种情况。您对以下解决方案有何看法:
private static final Integer batchSize = 1;
private static final Integer intervalRequest = 3000;
private static final Integer waitTimeout = 10;
private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private static final SqsAsyncClient sqsAsync =
SqsAsyncClient
.builder()
.endpointOverride(URI.create(queueUrl))
.build();
public static Flux<Message> sqsPublisher =
Flux.create(sink -> {
if (sink.isCancelled()) {
sink.error(new RuntimeException("Stop requested"));
}
scheduler.scheduleWithFixedDelay(() -> {
long numberOfRequests = Math.min(sink.requestedFromDownstream(), batchSize);
if (numberOfRequests > 0) {
ReceiveMessageRequest request = ReceiveMessageRequest
.builder()
.queueUrl(queueUrl)
.maxNumberOfMessages((int) numberOfRequests)
.waitTimeSeconds(waitTimeout).build();
CompletableFuture<ReceiveMessageResponse> response = sqsAsync.receiveMessage(request);
response.thenApply(responseValue -> {
if (responseValue != null && responseValue.messages() != null && !responseValue.messages().isEmpty()) {
responseValue.messages().stream().limit(numberOfRequests).forEach(sink::next);
}
return responseValue;
});
}
}, intervalRequest, intervalRequest, TimeUnit.MILLISECONDS);
});
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1937 次 |
| 最近记录: |