gst*_*low 7 java concurrency multithreading batching java.util.concurrent
让我们考虑以下代码:
客户代码:
public class MyClient {
private final MyClientSideService myClientSideService;
public MyClient(MyClientSideService myClientSideService) {
this.myClientSideService = myClientSideService;
}
public String requestRow(Integer req) {
return myClientSideService.requestSingleRow(req);
}
}
Run Code Online (Sandbox Code Playgroud)
客户端服务:
public class MyClientSideService {
private final MyServerSideService myServerSideService;
public MyClientSideService(MyServerSideService myServerSideService) {
this.myServerSideService = myServerSideService;
}
public String requestSingleRow(int req) {
return myServerSideService.requestRowBatch(Arrays.asList(req)).get(0);
}
}
Run Code Online (Sandbox Code Playgroud)
服务器端服务:
@Slf4j
public class MyServerSideService {
//single threaded bottleneck service
public synchronized List<String> requestRowBatch(List<Integer> batchReq) {
log.info("Req for {} started");
try {
Thread.sleep(100);
return batchReq.stream().map(String::valueOf).collect(Collectors.toList());
} catch (InterruptedException e) {
return null;
} finally {
log.info("Req for {} finished");
}
}
}
Run Code Online (Sandbox Code Playgroud)
主要:
@Slf4j
public class MainClass {
public static void main(String[] args) {
MyClient myClient = new MyClient(new MyClientSideService(new MyServerSideService()));
for (int i = 0; i < 20; i++) {
new Thread(() -> {
for (int m = 0; m < 100; m++) {
int k = m;
log.info("Response is {}", myClient.requestRow(k));
}
}).start();
}
}
}
Run Code Online (Sandbox Code Playgroud)
根据日志记录,大约需要4分22秒,但它太多了。我认为它可能会大大改善。我想实现隐式批处理。因此MyClientSideService应该收集请求,当它变成50(它是预先配置的批处理大小)或某个预先配置的超时到期时,再请求MyServerSideService并将结果路由回客户端。协议应该是同步的,因此必须阻止客户端,直到获得结果为止。
我尝试使用CountDownLatches和CyclicBarriers 编写代码,但是我的尝试远未成功。
我如何实现我的目标?
如果将requestRowBatch返回类型List<String>从替换为 Map<Integer, String>来委托请求和响应映射到服务器,则以下操作会受到限制。仅当我发送<= 25个请求时才有效
@Slf4j
public class MyClientSideService {
private final Integer batchSize = 25;
private final Integer maxTimeoutMillis = 5000;
private final MyServerSideService myServerSideService;
private final Queue<Integer> queue = new ArrayBlockingQueue(batchSize);
private final Map<Integer, String> responseMap = new ConcurrentHashMap();
private final AtomicBoolean started = new AtomicBoolean();
private CountDownLatch startBatchRequestLatch = new CountDownLatch(batchSize);
private CountDownLatch awaitBatchResponseLatch = new CountDownLatch(1);
public MyClientSideService(MyServerSideService myServerSideService) {
this.myServerSideService = myServerSideService;
}
public String requestSingleRow(int req) {
queue.offer(req);
if (!started.compareAndExchange(false, true)) {
log.info("Start batch collecting");
startBatchCollecting();
}
startBatchRequestLatch.countDown();
try {
log.info("Awaiting batch response latch for {}...", req);
awaitBatchResponseLatch.await();
log.info("Finished awaiting batch response latch for {}...", req);
return responseMap.get(req);
} catch (InterruptedException e) {
e.printStackTrace();
return "EXCEPTION";
}
}
private void startBatchCollecting() {
new Thread(() -> {
try {
log.info("Await startBatchRequestLatch");
startBatchRequestLatch.await(maxTimeoutMillis, TimeUnit.MILLISECONDS);
log.info("await of startBatchRequestLatch finished");
} catch (InterruptedException e) {
e.printStackTrace();
}
responseMap.putAll(requestBatch(queue));
log.info("Releasing batch response latch");
awaitBatchResponseLatch.countDown();
}).start();
}
public Map<Integer, String> requestBatch(Collection<Integer> requestList) {
return myServerSideService.requestRowBatch(requestList);
}
}
Run Code Online (Sandbox Code Playgroud)
根据麦芽的回答,我能够开发出以下产品:
@Slf4j
public class MyClientSideServiceCompletableFuture {
private final Integer batchSize = 25;
private final Integer maxTimeoutMillis = 5000;
private final MyServerSideService myServerSideService;
private final Queue<Pair<Integer, CompletableFuture>> queue = new ArrayBlockingQueue(batchSize);
private final AtomicInteger counter = new AtomicInteger(0);
private final Lock lock = new ReentrantLock();
public MyClientSideServiceCompletableFuture(MyServerSideService myServerSideService) {
this.myServerSideService = myServerSideService;
}
public String requestSingleRow(int req) {
CompletableFuture<String> future = new CompletableFuture<>();
lock.lock();
try {
queue.offer(Pair.of(req, future));
int counter = this.counter.incrementAndGet();
if (counter != 0 && counter % batchSize == 0) {
log.info("request");
List<Integer> requests = queue.stream().map(p -> p.getKey()).collect(Collectors.toList());
Map<Integer, String> serverResponseMap = requestBatch(requests);
queue.forEach(pair -> {
String response = serverResponseMap.get(pair.getKey());
CompletableFuture<String> value = pair.getValue();
value.complete(response);
});
queue.clear();
}
} finally {
lock.unlock();
}
try {
return future.get();
} catch (Exception e) {
return "Exception";
}
}
public Map<Integer, String> requestBatch(Collection<Integer> requestList) {
return myServerSideService.requestRowBatch(requestList);
}
}
Run Code Online (Sandbox Code Playgroud)
但是,如果大小不是批处理大小的倍数,则无法使用
你可以使用CompletableFuture.
让调用的线程MyClientSideService将其请求放入Queue( 可能 )中BlockingQueue,并获得一个新的CompletableFuture返回。调用线程可以调用CompletableFuture.get()阻塞,直到结果准备好,或者继续执行其他操作。
它将CompletableFuture与请求一起存储在 中MyClientSideService。当您达到 50 个请求(因此达到 50 个CompletableFuture实例)时,让客户端服务发送批量请求。
当请求完成时,使用队列中CompletableFuture.complete(value)每个实例的方法ComplatableFuture通知客户端线程响应已准备好。如果客户端调用了类似的阻塞方法,这将解除客户端的阻塞CompletableFuture.get(),或者如果稍后调用,则使其立即返回值。
| 归档时间: |
|
| 查看次数: |
206 次 |
| 最近记录: |