Java,在多线程evnironments中通过散列统一划分传入的工作

Iva*_*lin 4 java math concurrency hash multithreading

我已经Runnable使用基于hashCode模块的n个线程实现了一个java代码来执行传入的任务(as )nThreads.这些工作应该在这些线程中理想地 - 统一地 - 传播.具体来说,我们有一个dispatchId作为每个任务的字符串.

这是这个java代码片段:

int nThreads = Runtime.getRuntime().availableProcessors(); // Number of threads
Worker[] workers = new Worker[nThreads]; // Those threads, Worker is just a thread class that can run incoming tasks
...
Worker getWorker(String dispatchId) { // Get a thread for this Task
    return workers[(dispatchId.hashCode() & Integer.MAX_VALUE) % nThreads];
}
Run Code Online (Sandbox Code Playgroud)

重要提示:在大多数情况下,dispatchId是:

String dispatchId = 'SomePrefix' + counter.next()
Run Code Online (Sandbox Code Playgroud)

但是,我担心nThreads的模除法不是一个好的选择,因为nThreads应该是一个素数,用于更均匀地分配dispatId键.

关于如何更好地传播工作还有其他选择吗?

更新1:

每个Worker都有一个队列: Queue<RunnableWrapper> tasks = new ConcurrentLinkedQueue();

工作人员从中获取任务并执行它们.任务可以从其他线程添加到此队列.

更新2:

具有相同的任务dispatchId可以多次进行,因此我们需要找到他们的线程dispatchId.

最重要的是,每个Worker线程必须按顺序处理其传入的任务.因此,上面的更新1中存在数据结构队列.

更新3: 此外,一些线程可能很忙,而其他线程是免费的.因此,我们需要以某种方式将队列与线程分离,但dispatchId为任务执行保持相同的FIFO顺序.

解决方案: 我已经实现了Ben Manes的想法(他的答案如下),代码可以在这里找到.

Ben*_*nes 8

听起来你需要每个调度ID的FIFO排序,所以理想的情况是将调度队列作为抽象.这可以解释你对哈希关注不能提供统一分布的担忧,因为一些调度队列可能比其他队列更活跃,并且在工人之间不公平地平衡.通过将队列与worker分开,您可以保留FIFO语义并均匀地分散工作.

HawtDispatch是一个提供此抽象的非活动库.它与Java 6兼容.

一个非常简单的Java 8方法是使用CompletableFuture作为排队机制,使用ConcurrentHashMap进行注册,使用Executor(例如ForkJoinPool)进行计算.有关此想法的实现,请参阅EventDispatcher,其中注册是显式的.如果您的调度员更具动态性,那么您可能需要定期修剪地图.基本思路如下.

ConcurrentMap<String, CompletableFuture<Void>> dispatchQueues = ...

public CompletableFuture<Void> dispatch(String queueName, Runnable task) {
  return dispatchQueues.compute(queueName, (k, queue) -> {
    return (queue == null)
        ? CompletableFuture.runAsync(task)
        : queue.thenRunAsync(task);
  });
}
Run Code Online (Sandbox Code Playgroud)

更新(JDK7)

上述想法的后端将与番石榴翻译成类似的东西,

ListeningExecutorService executor = ...
Striped<Lock> locks = Striped.lock(256);
ConcurrentMap<String, ListenableFuture<?>> dispatchQueues = ...

public ListenableFuture<?> dispatch(String queueName, final Runnable task) {
  Lock lock = locks.get(queueName);
  lock.lock();
  try {
    ListenableFuture<?> future = dispatchQueues.get(queueName);
    if (future == null) {
      future = executor.submit(task);
    } else {
      final SettableFuture<Void> next = SettableFuture.create();
      future.addListener(new Runnable() {
        try {
          task.run();
        } finally {
          next.set(null);
        }
      }, executor);
      future = next;
    }
    dispatchQueues.put(queueName, future);
  } finally {
    lock.unlock();
  }
}
Run Code Online (Sandbox Code Playgroud)