在Java中异步处理队列中的Amazon SQS消息

jas*_*koh 5 java amazon-web-services

我很难搞清楚如何处理来自Amazon SQS的消息.

我正在尝试实现以下内容:

  1. SQS的听众
  2. 处理来自队列的消息并将其添加到DB
  3. 从队列中删除已处理的消息

令我困扰的是如何实施第2步.我有课SQSConnectorProfileDao.现在,我想简单的实现,通过初始化SQSConnectorProfileDao,并从队列中接收消息.我的想法是启动新线程,开始轮询消息,当队列为空时中断线程ProfileDao.

返回/处理消息的最佳方法是什么(回调函数?),如果有另一种方法可以做到这一点,我可以选择.

谢谢

Hyp*_*eXR 3

我已经使用 Java 的ExecutorServiceFutureConcurrentLinkedQueue完成了与 SQS 类似的事情。

ExecutorService 创建一个线程池,可以执行实现 Callable 接口的类并返回 Future。当 ExecutorService 创建 future 时,我将它们推送到在线程中运行的 ConcurrentLinkedQueue 上,并在 future 完成时处理结果。

实现检查SQS并异步启动工作:

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class SqsProcessor {

    private static final int THREAD_COUNT = 100;
    private ExecutorService _executor = null;
    private FutureResultProcessor futureResultProcessor = null;
    
    public SqsProcessor() {
        _executor = Executors.newFixedThreadPool(THREAD_COUNT);
        _futureResultProcessor = new FutureResultProcessor();
    }

    public void waitReceive() {

        // Receive a SQS message

        // Start the work related to the SQS message
        Callable<MyWorkerResult> sqsWorker = new MyWorker(sqsMessage);
        Future<MyWorkerResult> sqsFuture = _executor.submit(sqsWorker);

        // Send to the queue so the result can be processed when it completes
        _futureResultProcessor.add(sqsFuture);
    }
}
Run Code Online (Sandbox Code Playgroud)

完成这项工作的类:

import java.util.concurrent.Callable;

public class MyWorker implements Callable<MyWorkerResult> {

    private String _sqsMessage = null;

    public MyWorker(String sqsMessage) {
        _sqsMessage = sqsMessage;
    }

    @Override
    public MyWorkerResult call() throws Exception {
        // Do work relating to the SQS message
    }
}
Run Code Online (Sandbox Code Playgroud)

保存工作结果:

public class MyWorkerResult {
    // Results set in MyWorker call()
}
Run Code Online (Sandbox Code Playgroud)

ConcurrentLinkedQueue 接收并处理未来的结果:

import java.util.concurrent.Future;
import java.util.concurrent.ConcurrentLinkedQueue;

public class FutureResultProcessor extends Thread {

    private final ConcurrentLinkedQueue<Future<MyWorkerResult>> resultQueue = new ConcurrentLinkedQueue<Future<MyWorkerResult>>();
    private final Integer CHECK_SLEEP = 300;

    public FutureResultProcessor() {
    }

    public void run() {
        while(true) {
            Future<MyWorkerResult> myFuture = resultQueue.poll();

            if(myFuture == null) {
                // There's nothing to process
                try { Thread.sleep(CHECK_SLEEP); } catch (InterruptedException e) {}
                continue;
            }

            // Process result
            if(myFuture != null) {

                MyFutureResult myFutureResult = myFuture.get();

                // Process result
            }
        }
    }

    public void add(Future<MyWorkerResult> sqsFuture) {
        resultQueue.offer(sqsFuture);
    }
}
Run Code Online (Sandbox Code Playgroud)

或者,您可以收集一组 future 并等待它们全部完成,然后再处理结果。

Akka可能是一个不错的选择。我没有直接使用它,但它提供了一个用于运行异步任务的框架,提供错误处理,甚至可以将任务分发到远程实例。