如何从每个线程的数组中读取唯一元素?

Mar*_*llo 12 java concurrency multithreading scalability

我有一个基于数组的对象,它实现了以下接口:

public interface PairSupplier<Q, E> {
     public int size();

     public Pair<Q, E> get(int index);
}
Run Code Online (Sandbox Code Playgroud)

我想在它上面创建一个特定的迭代器:

public boolean hasNext(){
     return true;
}

public Pair<Q, E> next(){
     //some magic
}
Run Code Online (Sandbox Code Playgroud)

接下来的方法中,我想从PairSupplier返回一些元素.

这个元素对于线程应该是唯一的,其他线程不应该有这个元素.

由于PairSupplier具有最终大小,因此这种情况并非总是可行,但我想接近它.

元素的顺序无关紧要,线程可以在不同的时间使用相同的元素.

示例: 2 Threads,5 elements-{1,2,3,4,5}

Thread 1  | Thread 2
   1           2
   3           4
   5           1
   3           2
   4           5
Run Code Online (Sandbox Code Playgroud)

我的解决方案:

我创建AtomicInteger索引,我会在每次下一次调用时递增.

PairSupplier pairs;
AtomicInteger index;

public boolean hasNext(){
     return true;
}

public Pair<Q, E> next(){
     int position = index.incrementAndGet() % pairs.size;
     if (position < 0) {
          position *= -1;
          position = pairs.size - position;
     }
     return pairs.get(position);
}
Run Code Online (Sandbox Code Playgroud)

索引在所有线程之间共享.

我发现这个解决方案不可扩展(因为所有线程都需要增量),也许有人有更好的想法?

这个迭代器将被50-1000个线程使用.

Old*_*eon 4

您的问题详细信息不明确 - 您的示例表明两个线程可以相同Pair,但您在描述中另有说明。

由于更难实现,我将提供一种Iterable<Pair<Q,E>>Pair每个线程交付一个直到供应商循环 - 然后它将重复。

public interface Supplier<T> {
  public int size();

  public T get(int index);

}

public interface PairSupplier<Q, E> extends Supplier<Pair<Q, E>> {
}

public class IterableSupplier<T> implements Iterable<T> {
  // The common supplier to use across all threads.
  final Supplier<T> supplier;
  // The atomic counter.
  final AtomicInteger i = new AtomicInteger();

  public IterableSupplier(Supplier<T> supplier) {
    this.supplier = supplier;
  }

  @Override
  public Iterator<T> iterator() {
    /**
     * You may create a NEW iterator for each thread while they all share supplier
     * and Will therefore distribute each Pair between different threads.
     *
     * You may also share the same iterator across multiple threads.
     *
     * No two threads will get the same pair twice unless the sequence cycles.
     */
    return new ThreadSafeIterator();
  }

  private class ThreadSafeIterator implements Iterator<T> {
    @Override
    public boolean hasNext() {
      /**
       * Always true.
       */
      return true;
    }

    private int pickNext() {
      // Just grab one atomically.
      int pick = i.incrementAndGet();
      // Reset to zero if it has exceeded - but no spin, let "just someone" manage it.
      int actual = pick % supplier.size();
      if (pick != actual) {
        // So long as someone has a success before we overflow int we're good.
        i.compareAndSet(pick, actual);
      }
      return actual;
    }

    @Override
    public T next() {
      return supplier.get(pickNext());
    }

    @Override
    public void remove() {
      throw new UnsupportedOperationException("Remove not supported.");
    }

  }

}
Run Code Online (Sandbox Code Playgroud)

注意:我对代码做了一些调整以适应这两种情况。您可以为Iterator每个线程获取一个或跨线程共享一个Iterator