具有唯一任务的线程池队列

for*_*has 7 java spring multithreading java.util.concurrent

我正在使用ThreadPoolTask​​Executor(spring)来异步执行某些任务.

所需任务将从外部DB加载一些对象到我的系统内存中.我使用的最大线程池大小为10,最大队列大小为100.

假设所有10个线程都被占用从我的数据库中获取对象并创建了一个任务,它将进入队列.现在创建了另一个任务,它应该从DB获取相同的对象(DB中的相同键),它也将进入队列(假设所有10个线程仍然被占用).

因此,我的队列可能很容易完成重复任务,这些任务将依次执行,我不希望这种情况发生.

我认为解决方案应该以独特集合的形式出现,该集合充当线程池队列.在引擎盖下,ThreadPoolTask​​Executor使用LinkedBlockingQueue,它不提供唯一性.

我想到了一些可能的解决方案,但没有一个满足我:

  • 使用ThreadPoolExecutor而不是ThreadPoolTask​​Executor.ThreadPoolExecutor提供了一个构造函数,它允许我确定线程池队列类型,但它需要实现BlockingQueue接口.我找不到保持唯一性的实现.

这导致我尝试扩展LinkedBlockingQueue并覆盖add:

public boolean add(E e)
    if(!this.contains(e)) {
        return super.add(e);
    } else {
        return false;
    }
}
Run Code Online (Sandbox Code Playgroud)

但据我所知,这将导致性能大幅降低,因为该contains方法受到O(n)的限制 - 糟糕的想法.

有什么可以解决我的问题?我的目标是获得良好的性能(在内存性能权衡的情况下,我不介意放弃内存以提高性能).

mar*_*eig 6

使用GuavaListenableFuture你可以做类似的事情(没有经过测试)

Set<String> uniqueQueue = Sets.newConcurrentHashSet();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 0, TimeUnit.SECONDS, Queues.newLinkedBlockingQueue(100));
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(threadPoolExecutor);

String t1 = "abc";
if(uniqueQueue.add(t1)) {
    ListenableFuture<String> future = executorService.submit(() -> "do something with " + t1);
    Futures.addCallback(future, new FutureCallback<String>() {
        @Override
        public void onSuccess(String result) {
            uniqueQueue.remove(t1);
        }

        @Override
        public void onFailure(Throwable t) {
            uniqueQueue.remove(t1);
        }
    });
}
Run Code Online (Sandbox Code Playgroud)

导致

  • 只有当前未处理或队列中的项目才会被添加到队列中(uniqueQueue)
  • 已处理的项目将从中删除 uniqueQueue
  • 你只有队列中最多100件物品

这个实现不处理

  • Exceptions通过该submit()方法抛出
  • 最大项目数 unqiueQueue

根据您将数据库中的对象加载到内存中的要求,您可能需要查看Guava的Caches.

更新:

  • 该解决方案可以以相同的方式实现,但没有Guava和ListenableFuture.你可以从ThreadPoolExecutor覆盖`afterExecute(Runnable r,Throwable t)`方法并做同样的事情. (2认同)
  • 我最终扩展了ThreadPoolTask​​Executor(spring)并覆盖了"execute"方法. (2认同)