jas*_*koh 5 java amazon-web-services
我很难搞清楚如何处理来自Amazon SQS的消息.
我正在尝试实现以下内容:
令我困扰的是如何实施第2步.我有课SQSConnector和ProfileDao.现在,我想简单的实现,通过初始化SQSConnector中ProfileDao,并从队列中接收消息.我的想法是启动新线程,开始轮询消息,当队列为空时中断线程ProfileDao.
返回/处理消息的最佳方法是什么(回调函数?),如果有另一种方法可以做到这一点,我可以选择.
谢谢
我已经使用 Java 的ExecutorService、Future和ConcurrentLinkedQueue完成了与 SQS 类似的事情。
ExecutorService 创建一个线程池,可以执行实现 Callable 接口的类并返回 Future。当 ExecutorService 创建 future 时,我将它们推送到在线程中运行的 ConcurrentLinkedQueue 上,并在 future 完成时处理结果。
实现检查SQS并异步启动工作:
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class SqsProcessor {
private static final int THREAD_COUNT = 100;
private ExecutorService _executor = null;
private FutureResultProcessor futureResultProcessor = null;
public SqsProcessor() {
_executor = Executors.newFixedThreadPool(THREAD_COUNT);
_futureResultProcessor = new FutureResultProcessor();
}
public void waitReceive() {
// Receive a SQS message
// Start the work related to the SQS message
Callable<MyWorkerResult> sqsWorker = new MyWorker(sqsMessage);
Future<MyWorkerResult> sqsFuture = _executor.submit(sqsWorker);
// Send to the queue so the result can be processed when it completes
_futureResultProcessor.add(sqsFuture);
}
}
Run Code Online (Sandbox Code Playgroud)
完成这项工作的类:
import java.util.concurrent.Callable;
public class MyWorker implements Callable<MyWorkerResult> {
private String _sqsMessage = null;
public MyWorker(String sqsMessage) {
_sqsMessage = sqsMessage;
}
@Override
public MyWorkerResult call() throws Exception {
// Do work relating to the SQS message
}
}
Run Code Online (Sandbox Code Playgroud)
保存工作结果:
public class MyWorkerResult {
// Results set in MyWorker call()
}
Run Code Online (Sandbox Code Playgroud)
ConcurrentLinkedQueue 接收并处理未来的结果:
import java.util.concurrent.Future;
import java.util.concurrent.ConcurrentLinkedQueue;
public class FutureResultProcessor extends Thread {
private final ConcurrentLinkedQueue<Future<MyWorkerResult>> resultQueue = new ConcurrentLinkedQueue<Future<MyWorkerResult>>();
private final Integer CHECK_SLEEP = 300;
public FutureResultProcessor() {
}
public void run() {
while(true) {
Future<MyWorkerResult> myFuture = resultQueue.poll();
if(myFuture == null) {
// There's nothing to process
try { Thread.sleep(CHECK_SLEEP); } catch (InterruptedException e) {}
continue;
}
// Process result
if(myFuture != null) {
MyFutureResult myFutureResult = myFuture.get();
// Process result
}
}
}
public void add(Future<MyWorkerResult> sqsFuture) {
resultQueue.offer(sqsFuture);
}
}
Run Code Online (Sandbox Code Playgroud)
或者,您可以收集一组 future 并等待它们全部完成,然后再处理结果。
Akka可能是一个不错的选择。我没有直接使用它,但它提供了一个用于运行异步任务的框架,提供错误处理,甚至可以将任务分发到远程实例。
| 归档时间: |
|
| 查看次数: |
1770 次 |
| 最近记录: |