mar*_*her 8 java performance asynchronous priority-queue completable-future
TL;DR:当有几个CompletableFutures 等待执行时,我如何优先考虑那些我感兴趣的值?
我有一个 10,000CompletableFuture秒的列表(计算产品数据库内部报告的数据行):
List<Product> products = ...;
List<CompletableFuture<DataRow>> dataRows = products
.stream()
.map(p -> CompletableFuture.supplyAsync(() -> calculateDataRowForProduct(p), singleThreadedExecutor))
.collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)
每个都需要大约50 毫秒才能完成,所以整个事情在500 秒内完成。(它们都共享相同的数据库连接,因此不能并行运行)。
假设我想访问第 9000 个产品的数据行:
dataRows.get(9000).join()
问题是,所有这些 CompletableFuture 都是按照它们被创建的顺序执行的,而不是按照它们被访问的顺序。这意味着我必须等待450 秒才能计算出我目前不关心的内容,最终到达我想要的数据行。
问:有什么办法改变这种行为,使期货我尝试访问GET优先对那些我不关心的时刻?
第一个想法:
我注意到 aThreadPoolExecutor使用 aBlockingQueue<Runnable>来排队等待可用线程的条目。
因此,我考虑使用PriorityBlockingQueue, 来更改Runnable访问它时的优先级,CompletableFuture但是:
PriorityBlockingQueue没有方法重新排列现有元素的优先级,并且CompletableFuture到Runnable队列中的相应条目。在我沿着这条路走得更远之前,您认为这听起来是正确的方法吗?其他人有过这种要求吗?我试图搜索它,但一无所获。也许CompletableFuture不是这样做的正确方法?
背景:我们有一个内部报告,每页显示 100 个产品。最初我们预先计算了报告的所有数据行,如果有人拥有那么多产品,这会花费很长时间。
所以第一个优化是将计算包装在一个记忆化的供应商中:
List<Supplier<DataRow>> dataRows = products
.stream()
.map(p -> Suppliers.memoize(() -> calculateDataRowForProduct(p)))
.collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)
这意味着,第一100个条目的初始显示现在只需5秒,而不是500秒(这是很大的),但是,当用户切换到下一个页面,它需要另一个5秒为他们中的每一个。
所以这个想法是,当用户盯着第一个屏幕时,为什么不在后台预先计算下一页。这让我想到了上面的问题。
有趣的问题:)
一种方法是推出自定义FutureTask类以方便动态更改任务的优先级。
DataRow为简单起见,此处将两者Product视为String。
import java.util.*;
import java.util.concurrent.*;
public class Testing {
private static String calculateDataRowForProduct(String product) {
try {
// Dummy operation.
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Computation done for " + product);
return "data row for " + product;
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
PriorityBlockingQueue<Runnable> customQueue = new PriorityBlockingQueue<Runnable>(1, new CustomRunnableComparator());
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, customQueue);
List<String> products = new ArrayList<>();
for (int i = 0; i < 10; i++) {
products.add("product" + i);
}
Map<Integer, PrioritizedFutureTask<String>> taskIndexMap = new HashMap<>();
for (int i = 0; i < products.size(); i++) {
String product = products.get(i);
Callable callable = () -> calculateDataRowForProduct(product);
PrioritizedFutureTask<String> dataRowFutureTask = new PrioritizedFutureTask<>(callable, i);
taskIndexMap.put(i, dataRowFutureTask);
executor.execute(dataRowFutureTask);
}
List<Integer> accessOrder = new ArrayList<>();
accessOrder.add(4);
accessOrder.add(7);
accessOrder.add(2);
accessOrder.add(9);
int priority = -1 * accessOrder.size();
for (Integer nextIndex : accessOrder) {
PrioritizedFutureTask taskAtIndex = taskIndexMap.get(nextIndex);
assert (customQueue.remove(taskAtIndex));
customQueue.offer(taskAtIndex.set_priority(priority++));
// Now this task will be at the front of the thread pool queue.
// Hence this task will execute next.
}
for (Integer nextIndex : accessOrder) {
PrioritizedFutureTask<String> dataRowFutureTask = taskIndexMap.get(nextIndex);
String dataRow = dataRowFutureTask.get();
System.out.println("Data row for index " + nextIndex + " = " + dataRow);
}
}
}
class PrioritizedFutureTask<T> extends FutureTask<T> implements Comparable<PrioritizedFutureTask<T>> {
private Integer _priority = 0;
private Callable<T> callable;
public PrioritizedFutureTask(Callable<T> callable, Integer priority) {
super(callable);
this.callable = callable;
_priority = priority;
}
public Integer get_priority() {
return _priority;
}
public PrioritizedFutureTask set_priority(Integer priority) {
_priority = priority;
return this;
}
@Override
public int compareTo(@NotNull PrioritizedFutureTask<T> other) {
if (other == null) {
throw new NullPointerException();
}
return get_priority().compareTo(other.get_priority());
}
}
class CustomRunnableComparator implements Comparator<Runnable> {
@Override
public int compare(Runnable task1, Runnable task2) {
return ((PrioritizedFutureTask)task1).compareTo((PrioritizedFutureTask)task2);
}
}
Run Code Online (Sandbox Code Playgroud)
输出:
Computation done for product0
Computation done for product4
Data row for index 4 = data row for product4
Computation done for product7
Data row for index 7 = data row for product7
Computation done for product2
Data row for index 2 = data row for product2
Computation done for product9
Data row for index 9 = data row for product9
Computation done for product1
Computation done for product3
Computation done for product5
Computation done for product6
Computation done for product8
Run Code Online (Sandbox Code Playgroud)
这里还有一个优化范围。
该customQueue.remove(taskAtIndex)操作的O(n)时间复杂度取决于队列的大小(或产品的总数)。
如果产品数量较少(<= 10^5),可能影响不大。
但否则可能会导致性能问题。
一种解决方案是扩展BlockingPriorityQueue和推出从优先级队列中删除元素的功能,O(logn)而不是 O(n)。
我们可以通过在 PriorityQueue 结构中保留一个哈希图来实现这一点。该哈希图将保留底层数组中该元素的元素计数与索引(或重复情况下的索引)。幸运的是,我之前已经在Python
中实现了这样的堆。
如果您对此优化有更多疑问,最好提出一个新问题。
您可以避免在开始时将所有任务提交给执行程序,而只提交一个后台任务,并在完成后提交下一个任务。如果你想获取第9000行,请立即提交(如果尚未提交):
static class FutureDataRow {
CompletableFuture<DataRow> future;
int index;
List<FutureDataRow> list;
Product product;
FutureDataRow(List<FutureDataRow> list, Product product){
this.list = list;
index = list.size();
list.add(this);
this.product = product;
}
public DataRow get(){
submit();
return future.join();
}
private synchronized void submit(){
if(future == null) future = CompletableFuture.supplyAsync(() ->
calculateDataRowForProduct(product), singleThreadedExecutor);
}
private void background(){
submit();
if(index >= list.size() - 1) return;
future.whenComplete((dr, t) -> list.get(index + 1).background());
}
}
...
List<FutureDataRow> dataRows = new ArrayList<>();
products.forEach(p -> new FutureDataRow(dataRows, p));
dataRows.get(0).background();
Run Code Online (Sandbox Code Playgroud)
如果您希望它们随后导航到下一页,您也可以在 get 方法中提交下一行。
如果您使用多线程执行器并且想要同时运行多个后台任务,则可以修改后台方法以查找列表中下一个未提交的任务,并在当前后台任务完成时启动它。
private synchronized boolean background(){
if(future != null) return false;
submit();
future.whenComplete((dr, t) -> {
for(int i = index + 1; i < list.size(); i++){
if(list.get(i).background()) return;
}
});
return true;
}
Run Code Online (Sandbox Code Playgroud)
您还需要在后台启动前 n 个任务,而不仅仅是第一个任务。
int n = 8; //number of active background tasks
for(int i = 0; i < dataRows.size() && n > 0; i++){
if(dataRows.get(i).background()) n--;
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
397 次 |
| 最近记录: |