Mar*_*llo 12 java concurrency multithreading scalability
我有一个基于数组的对象,它实现了以下接口:
public interface PairSupplier<Q, E> {
public int size();
public Pair<Q, E> get(int index);
}
Run Code Online (Sandbox Code Playgroud)
我想在它上面创建一个特定的迭代器:
public boolean hasNext(){
return true;
}
public Pair<Q, E> next(){
//some magic
}
Run Code Online (Sandbox Code Playgroud)
在接下来的方法中,我想从PairSupplier返回一些元素.
这个元素对于线程应该是唯一的,其他线程不应该有这个元素.
由于PairSupplier具有最终大小,因此这种情况并非总是可行,但我想接近它.
元素的顺序无关紧要,线程可以在不同的时间使用相同的元素.
示例: 2 Threads,5 elements-{1,2,3,4,5}
Thread 1 | Thread 2
1 2
3 4
5 1
3 2
4 5
Run Code Online (Sandbox Code Playgroud)
我的解决方案:
我创建AtomicInteger索引,我会在每次下一次调用时递增.
PairSupplier pairs;
AtomicInteger index;
public boolean hasNext(){
return true;
}
public Pair<Q, E> next(){
int position = index.incrementAndGet() % pairs.size;
if (position < 0) {
position *= -1;
position = pairs.size - position;
}
return pairs.get(position);
}
Run Code Online (Sandbox Code Playgroud)
对和索引在所有线程之间共享.
我发现这个解决方案不可扩展(因为所有线程都需要增量),也许有人有更好的想法?
这个迭代器将被50-1000个线程使用.
您的问题详细信息不明确 - 您的示例表明两个线程可以相同Pair,但您在描述中另有说明。
由于更难实现,我将提供一种Iterable<Pair<Q,E>>将Pair每个线程交付一个直到供应商循环 - 然后它将重复。
public interface Supplier<T> {
public int size();
public T get(int index);
}
public interface PairSupplier<Q, E> extends Supplier<Pair<Q, E>> {
}
public class IterableSupplier<T> implements Iterable<T> {
// The common supplier to use across all threads.
final Supplier<T> supplier;
// The atomic counter.
final AtomicInteger i = new AtomicInteger();
public IterableSupplier(Supplier<T> supplier) {
this.supplier = supplier;
}
@Override
public Iterator<T> iterator() {
/**
* You may create a NEW iterator for each thread while they all share supplier
* and Will therefore distribute each Pair between different threads.
*
* You may also share the same iterator across multiple threads.
*
* No two threads will get the same pair twice unless the sequence cycles.
*/
return new ThreadSafeIterator();
}
private class ThreadSafeIterator implements Iterator<T> {
@Override
public boolean hasNext() {
/**
* Always true.
*/
return true;
}
private int pickNext() {
// Just grab one atomically.
int pick = i.incrementAndGet();
// Reset to zero if it has exceeded - but no spin, let "just someone" manage it.
int actual = pick % supplier.size();
if (pick != actual) {
// So long as someone has a success before we overflow int we're good.
i.compareAndSet(pick, actual);
}
return actual;
}
@Override
public T next() {
return supplier.get(pickNext());
}
@Override
public void remove() {
throw new UnsupportedOperationException("Remove not supported.");
}
}
}
Run Code Online (Sandbox Code Playgroud)
注意:我对代码做了一些调整以适应这两种情况。您可以为Iterator每个线程获取一个或跨线程共享一个Iterator。
| 归档时间: |
|
| 查看次数: |
1282 次 |
| 最近记录: |