如何使用 Jersey Client 2.2.x 取消挂起的异步请求并取消注册调用回调?

oos*_*sie 5 java rest asynchronous jersey jersey-client

我有一个带有 sleep 方法的简单 REST 服务,该方法除了在指定的毫秒时间内睡眠之外不做任何事情,然后返回无内容响应。我的 RESTTest 类尝试首先调用http://localhost:8080/myapp/rest/sleep/7500(休眠 7.5 秒),但只等待 5 秒。5 秒后,它取消接收到的 Future(尝试取消挂起的请求)并调用http://localhost:8080/myapp/rest/sleep/5000(休眠 5 秒)并等待 5 秒。

public class RESTTest {
    private final Client client = ClientBuilder.newClient();
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition responseReceived = lock.newCondition();

    public static void main(final String... arguments) {
        new RESTTest().listen(10000);
    }

    public void listen(final long time) {
        System.out.println("Listen for " + time + " ms.");
        Future<Response> _response =
            client.
                target("http://localhost:8080/myapp/rest/sleep/" + time)).
                request().
                async().
                    get(
                        new InvocationCallback<Response>() {
                            public void completed(final Response response) {
                                System.out.println("COMPLETED");
                                lock.lock();
                                try {
                                    responseReceived.signalAll();
                                } finally {
                                    lock.unlock();
                                }
                            }

                            public void failed(final Throwable throwable) {
                                lock.lock();
                                try {
                                    responseReceived.signalAll();
                                } finally {
                                    lock.unlock();
                                }
                            }
                        });
        lock.lock();
        try {
            System.out.println("Waiting for 5000 ms.");
            if (!responseReceived.await(5000, TimeUnit.MILLISECONDS)) {
                System.out.println("Timed out!");
                _response.cancel(true);
                listen(5000);
            } else {
                System.out.println("Response received.");
            }
        } catch (final InterruptedException exception) {
            // Do nothing.
        } finally {
            lock.unlock();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

现在我希望看到“COMPLETED”字符串仅打印一次,并且“收到响应”。字符串也只打印一次。然而,“COMPLETED”字符串被打印两次!

Listen for 7500 ms.
Waiting for 5000 ms.
Timed out!
Listen for 5000 ms.
Waiting for 5000 ms.
COMPLETED
Response received.
COMPLETED
Run Code Online (Sandbox Code Playgroud)

我在这里缺少什么?

谢谢,

Chr*_*haw 0

我相信您已经明白了,但这是一个非常模块化的解决方案,您可以将其与简单的 Guava ListenableFuture 一起使用。当然,您不必像我在 Futures.allAsList 中所做的那样汇集响应,但您可以在最后执行类似的操作并删除 CountDownLatch。

顺便说一句,我很确定你的问题是线程问题。您看到 COMPLETED 是因为回调是在您下次调用 Listen(5000) 后调用的。请记住,异步将被线程化,因此到控制台的输出可能会延迟到下一次上下文切换。服务器可能在 7500 信号量解锁后立即响应。

private Client client;

@Before
public void setup() {
    final ClientConfig clientConfig = new ClientConfig();
    clientConfig.register(OrtbBidRequestBodyReader.class);
    clientConfig.register(OrtbBidRequestBodyWriter.class);
    clientConfig.connectorProvider(new CachingConnectorProvider(new HttpUrlConnectorProvider()));
    clientConfig.property(ClientProperties.ASYNC_THREADPOOL_SIZE, 3);
    client = ClientBuilder.newClient(clientConfig);
}

@Test
public void testAsync() throws InterruptedException, ExecutionException, JsonProcessingException {

    final WebTarget target = client
            .target("http://localhost:8081/dsp-receiver-0.0.1-SNAPSHOT/ortb/bid/123123?testbid=bid");

    final AtomicInteger successcount = new AtomicInteger();
    final AtomicInteger noBid = new AtomicInteger();
    final AtomicInteger clientError = new AtomicInteger();

    final InvocationCallback<Response> callback = new InvocationCallback<Response>() {
        @Override
        public void completed(final Response response) {
            if (response.getStatus() == 200) {
                successcount.incrementAndGet();
            } else if (response.getStatus() == 204) {
                noBid.incrementAndGet();
            } else {
                clientError.incrementAndGet();
            }
        }

        @Override
        public void failed(final Throwable e) {
            clientError.incrementAndGet();
            logger.info("Client Error", e);
        }
    };

    final Entity<OrtbBidRequest> entity = Entity.entity(testBidRequest, MediaType.APPLICATION_JSON);
    final List<ListenableFuture<Response>> allFutures = Lists.newArrayList();
    final Stopwatch stopwatch = Stopwatch.createStarted();
    for (int i = 0; i < 100000; i++) {
        logger.info("Running request {}", i);
        final Future<Response> future = target.request().accept(MediaType.APPLICATION_JSON).async().post(entity,
                callback);
        final ListenableFuture<Response> response = JdkFutureAdapters.listenInPoolThread(future);
        allFutures.add(response);

        // For each 100 of requests we will wait on them, otherwise we
        // may run out of memory. This is really just to test the stamina
        // of the dsp
        if (i % 200 == 0) {
            Futures.allAsList(allFutures).get();
            allFutures.clear();
        }
    }

    logger.info("success count {}  nobid {} client error {} ", successcount, noBid, clientError);
    logger.info("Total time {} ms ", stopwatch.stop());
}
Run Code Online (Sandbox Code Playgroud)