Dropwizard中的JAX-RS:处理异步调用并立即响应

ana*_*and 4 java dropwizard jersey-2.0

我有一个资源Class,@ManagedAsync方法类看起来像这样:

@Path("my-resource")
public class MyResource extends BaseResource{

    private DatumDAO datumDAO;

    public MyResource(DatumDAO datumDAO){
        this.datumDAO = datumDAO;
    }

    public void cleanDatum(Datum datum){
       //time taking operations
    }

    @GET
    @ManagedAsync
    @Path("/cleanup/{from}/{till}/")
    @Consumes(MediaType.APPLICATION_JSON)
    @Produces(MediaType.APPLICATION_JSON)
    @UnitOfWork
    public void cleanupDirtyData(@Suspended final AsyncResponse asyncResponse, @PathParam("from") DateTimeParam from,
            @PathParam("till") DateTimeParam till) throws IOException{

        logger.debug("will try to cleanup dirty data in range: " + from + " " +  till);
        List<Datum> data = datumDAO.getALlDirtyDatumInRange(from.get().toDate(), till.get().toDate());
        Map<Long,String> cleanupMap = new HashMap<Long,String>();
        for(Datum datum: data){
            cleanDatum(datum);
            cleanupMap.put(datum.getId(), "cleaned");
        }
        // this response need to be sent [can be ignored]       
        asyncResponse.resume(Response.status(HttpStatus.OK_200).entity(cleanupMap).build());

    }

}
Run Code Online (Sandbox Code Playgroud)

由于调用cleanupDirtyData可能需要一段时间,我不希望客户端完全等待它,我知道执行工作被卸载到不同的工作线程.

我想要实现的是立即响应客户端并继续cleanupDirtyData异步执行该函数.

所以尝试了以下的事情:

施加强制超时,并给客户端过早响应,但这似乎不是理想的方式,它会停止执行.

看起来像这样:

@Path("my-resource")
public class MyResource extends BaseResource{

    private DatumDAO datumDAO;

    public MyResource(DatumDAO datumDAO){
        this.datumDAO = datumDAO;
    }

    public void cleanDatum(Datum datum){
       //time taking operations
    }

    @GET
    @ManagedAsync
    @Path("/cleanup/{from}/{till}/")
    @Consumes(MediaType.APPLICATION_JSON)
    @Produces(MediaType.APPLICATION_JSON)
    @UnitOfWork
    public void cleanupDirtyData(@Suspended final AsyncResponse asyncResponse, @PathParam("from") DateTimeParam from,
            @PathParam("till") DateTimeParam till) throws IOException{

        // Register handler and set timeout
        asyncResponse.setTimeoutHandler(new TimeoutHandler() {
            public void handleTimeout(AsyncResponse ar) {
                asyncResponse.resume(Response.status(SERVICE_UNAVAILABLE).entity(
                    "Operation timed out -- please try again").build());                    
                }
        });
        ar.setTimeout(15, TimeUnit.SECONDS);      

        logger.debug("will try to cleanup dirty data in range: " + from + " " +  till);
        List<Datum> data = datumDAO.getALlDirtyDatumInRange(from.get().toDate(), till.get().toDate());
        Map<Long,String> cleanupMap = new HashMap<Long,String>();
        for(Datum datum: data){
            cleanDatum(datum);
            cleanupMap.put(datum.getId(), "cleaned");
        }
       // this response need to be sent [can be ignored]              
        asyncResponse.resume(Response.status(HttpStatus.OK_200).entity(cleanupMap).build());

    }

}
Run Code Online (Sandbox Code Playgroud)

cas*_*lin 6

JAX-RS异步服务器API是所有关于如何容器将要管理的要求.但它仍将保留请求,不会影响客户体验.

引用有关异步服务器API的Jersey文档:

请注意,使用服务器端异步处理模型不会改善客户端感知的请求处理时间.然而,通过将初始请求处理线程释放回I/O容器,同时请求可能仍在队列中等待处理或者处理可能仍在另一个专用线程上运行,它将增加服务器的吞吐量.已发布的I/O容器线程可用于接受和处理新的传入请求连接.

如果您想立即给客户回复,您可能正在寻找以下内容:

@Singleton
@Path("expensive-task")
public class ExpensiveTaskResource {

    private ExecutorService executor;

    private Future<String> futureResult;

    @PostConstruct
    public void onCreate() {
        this.executor = Executors.newSingleThreadExecutor();
    }

    @POST
    public Response startTask() {
        futureResult = executor.submit(new ExpensiveTask());
        return Response.status(Status.ACCEPTED).build();
    }

    @GET
    public Response getResult() throws ExecutionException, InterruptedException {
        if (futureResult != null && futureResult.isDone()) {
            return Response.status(Status.OK).entity(futureResult.get()).build();
        } else {
            return Response.status(Status.FORBIDDEN).entity("Try later").build();
        }
    }

    @PreDestroy
    public void onDestroy() {
        this.executor.shutdownNow();
    }
}
Run Code Online (Sandbox Code Playgroud)
public class ExpensiveTask implements Callable<String> {

    @Override
    public String call() throws Exception {

        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return "Task completed";
    }
}
Run Code Online (Sandbox Code Playgroud)

在servlet容器中,您可以使用a ExecutorService来运行昂贵的任务.在Java EE容器中,您应该考虑使用ManagedExecutorService.