如何为客户端添加隐式批处理?

gst*_*low 7 java concurrency multithreading batching java.util.concurrent

让我们考虑以下代码:

客户代码:

public class MyClient {
    private final MyClientSideService myClientSideService;

    public MyClient(MyClientSideService myClientSideService) {
        this.myClientSideService = myClientSideService;
    }

    public String requestRow(Integer req) {
        return myClientSideService.requestSingleRow(req);
    }
}
Run Code Online (Sandbox Code Playgroud)

客户端服务:

public class MyClientSideService {
    private final MyServerSideService myServerSideService;

    public MyClientSideService(MyServerSideService myServerSideService) {
        this.myServerSideService = myServerSideService;
    }

    public String requestSingleRow(int req) {
        return myServerSideService.requestRowBatch(Arrays.asList(req)).get(0);
    }
}
Run Code Online (Sandbox Code Playgroud)

服务器端服务:

@Slf4j
public class MyServerSideService {
    //single threaded bottleneck service
    public synchronized List<String> requestRowBatch(List<Integer> batchReq) {
        log.info("Req for {} started");
        try {
            Thread.sleep(100);
            return batchReq.stream().map(String::valueOf).collect(Collectors.toList());

        } catch (InterruptedException e) {
            return null;
        } finally {
            log.info("Req for {} finished");

        }
    }
}
Run Code Online (Sandbox Code Playgroud)

主要:

@Slf4j
public class MainClass {
    public static void main(String[] args) {
        MyClient myClient = new MyClient(new MyClientSideService(new MyServerSideService()));
        for (int i = 0; i < 20; i++) {
            new Thread(() -> {
                for (int m = 0; m < 100; m++) {
                    int k = m;
                    log.info("Response is {}", myClient.requestRow(k));
                }
            }).start();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

根据日志记录,大约需要4分22秒,但它太多了。我认为它可能会大大改善。我想实现隐式批处理。因此MyClientSideService应该收集请求,当它变成50(它是预先配置的批处理大小)或某个预先配置的超时到期时,再请求MyServerSideService并将结果路由回客户端。协议应该是同步的,因此必须阻止客户端,直到获得结果为止。

我尝试使用CountDownLatches和CyclicBarriers 编写代码,但是我的尝试远未成功。

我如何实现我的目标?

聚苯乙烯

如果将requestRowBatch返回类型List<String>从替换为 Map<Integer, String>来委托请求和响应映射到服务器,则以下操作会受到限制。仅当我发送<= 25个请求时才有效

@Slf4j
public class MyClientSideService {
    private final Integer batchSize = 25;
    private final Integer maxTimeoutMillis = 5000;
    private final MyServerSideService myServerSideService;
    private final Queue<Integer> queue = new ArrayBlockingQueue(batchSize);
    private final Map<Integer, String> responseMap = new ConcurrentHashMap();
    private final AtomicBoolean started = new AtomicBoolean();

    private CountDownLatch startBatchRequestLatch = new CountDownLatch(batchSize);
    private CountDownLatch awaitBatchResponseLatch = new CountDownLatch(1);


    public MyClientSideService(MyServerSideService myServerSideService) {
        this.myServerSideService = myServerSideService;
    }

    public String requestSingleRow(int req) {
        queue.offer(req);
        if (!started.compareAndExchange(false, true)) {
            log.info("Start batch collecting");
            startBatchCollecting();
        }
        startBatchRequestLatch.countDown();
        try {
            log.info("Awaiting batch response latch for {}...", req);
            awaitBatchResponseLatch.await();
            log.info("Finished awaiting batch response latch for {}...", req);
            return responseMap.get(req);
        } catch (InterruptedException e) {
            e.printStackTrace();
            return "EXCEPTION";
        }
    }

    private void startBatchCollecting() {
        new Thread(() -> {
            try {
                log.info("Await startBatchRequestLatch");
                startBatchRequestLatch.await(maxTimeoutMillis, TimeUnit.MILLISECONDS);
                log.info("await of startBatchRequestLatch finished");

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            responseMap.putAll(requestBatch(queue));
            log.info("Releasing batch response latch");
            awaitBatchResponseLatch.countDown();

        }).start();
    }

    public Map<Integer, String> requestBatch(Collection<Integer> requestList) {

        return myServerSideService.requestRowBatch(requestList);
    }
}
Run Code Online (Sandbox Code Playgroud)

更新资料

根据麦芽的回答,我能够开发出以下产品:

@Slf4j
public class MyClientSideServiceCompletableFuture {
    private final Integer batchSize = 25;
    private final Integer maxTimeoutMillis = 5000;
    private final MyServerSideService myServerSideService;
    private final Queue<Pair<Integer, CompletableFuture>> queue = new ArrayBlockingQueue(batchSize);
    private final AtomicInteger counter = new AtomicInteger(0);
    private final Lock lock = new ReentrantLock();

    public MyClientSideServiceCompletableFuture(MyServerSideService myServerSideService) {
        this.myServerSideService = myServerSideService;
    }

    public String requestSingleRow(int req) {
        CompletableFuture<String> future = new CompletableFuture<>();
        lock.lock();
        try {
            queue.offer(Pair.of(req, future));
            int counter = this.counter.incrementAndGet();
            if (counter != 0 && counter % batchSize == 0) {
                log.info("request");
                List<Integer> requests = queue.stream().map(p -> p.getKey()).collect(Collectors.toList());
                Map<Integer, String> serverResponseMap = requestBatch(requests);
                queue.forEach(pair -> {
                    String response = serverResponseMap.get(pair.getKey());
                    CompletableFuture<String> value = pair.getValue();
                    value.complete(response);
                });
                queue.clear();
            }
        } finally {
            lock.unlock();
        }
        try {
            return future.get();
        } catch (Exception e) {
            return "Exception";
        }
    }


    public Map<Integer, String> requestBatch(Collection<Integer> requestList) {

        return myServerSideService.requestRowBatch(requestList);
    }
}
Run Code Online (Sandbox Code Playgroud)

但是,如果大小不是批处理大小的倍数,则无法使用

Mal*_*alt 0

你可以使用CompletableFuture.
让调用的线程MyClientSideService将其请求放入Queue( 可能 )中BlockingQueue,并获得一个新的CompletableFuture返回。调用线程可以调用CompletableFuture.get()阻塞,直到结果准备好,或者继续执行其他操作。

它将CompletableFuture与请求一起存储在 中MyClientSideService。当您达到 50 个请求(因此达到 50 个CompletableFuture实例)时,让客户端服务发送批量请求。

当请求完成时,使用队列中CompletableFuture.complete(value)每个实例的方法ComplatableFuture通知客户端线程响应已准备好。如果客户端调用了类似的阻塞方法,这将解除客户端的阻塞CompletableFuture.get(),或者如果稍后调用,则使其立即返回值。