Mic*_*Kay 12 java multithreading
我有一个通用的映射迭代器:类似这样的东西:
class Mapper<F, T> implements Iterator<T> {
private Iterator<F> input;
private Action<F, T> action;
public Mapper(input, action) {...}
public boolean hasNext() {
return input.hasNext();
}
public T next() {
return action.process(input.next());
}
}
Run Code Online (Sandbox Code Playgroud)
现在,假设action.process()可能非常耗时,我希望通过使用多个线程并行处理输入项来获得性能.我想分配一个N个工作线程池,并将项目分配给这些线程进行处理.这应该在"幕后"发生,因此客户端代码只能看到Iterator.代码应避免将输入或输出序列保存在内存中.
为了添加一个扭曲,我想要两个版本的解决方案,一个保留订单(最终迭代器以与输入迭代器相同的顺序交付项目),其中一个不一定保留订单(每个输出项目尽快交付)它是可用的).
我有点工作,但代码似乎令人费解和不可靠,我不相信它正在使用最佳实践.
有关最简单,最可靠的实施方法的建议吗?我正在寻找适用于JDK 6的东西,我想尽可能避免在外部库/框架上引入依赖.
我将使用线程池作为线程,并BlockingQueue从池中提供数据。
这似乎适用于我的简单测试用例。
interface Action<F, T> {
public T process(F f);
}
class Mapper<F, T> implements Iterator<T> {
protected final Iterator<F> input;
protected final Action<F, T> action;
public Mapper(Iterator<F> input, Action<F, T> action) {
this.input = input;
this.action = action;
}
@Override
public boolean hasNext() {
return input.hasNext();
}
@Override
public T next() {
return action.process(input.next());
}
}
class ParallelMapper<F, T> extends Mapper<F, T> {
// The pool.
final ExecutorService pool;
// The queue.
final BlockingQueue<T> queue;
// The next one to deliver.
private T next = null;
public ParallelMapper(Iterator<F> input, Action<F, T> action, int threads, int queueLength) {
super(input, action);
// Start my pool.
pool = Executors.newFixedThreadPool(threads);
// And the queue.
queue = new ArrayBlockingQueue<>(queueLength);
}
class Worker implements Runnable {
final F f;
private T t;
public Worker(F f) {
this.f = f;
}
@Override
public void run() {
try {
queue.put(action.process(f));
} catch (InterruptedException ex) {
// Not sure what you can do here.
}
}
}
@Override
public boolean hasNext() {
// All done if delivered it and the input is empty and the queue is empty and the threads are finished.
while (next == null && (input.hasNext() || !queue.isEmpty() || !pool.isTerminated())) {
// First look in the queue.
next = queue.poll();
if (next == null) {
// Queue empty.
if (input.hasNext()) {
// Start a new worker.
pool.execute(new Worker(input.next()));
}
} else {
// Input exhausted - shut down the pool - unless we already have.
if (!pool.isShutdown()) {
pool.shutdown();
}
}
}
return next != null;
}
@Override
public T next() {
T n = next;
if (n != null) {
// Delivered that one.
next = null;
} else {
// Fails.
throw new NoSuchElementException();
}
return n;
}
}
public void test() {
List<Integer> data = Arrays.asList(5, 4, 3, 2, 1, 0);
System.out.println("Data");
for (Integer i : Iterables.in(data)) {
System.out.println(i);
}
Action<Integer, Integer> action = new Action<Integer, Integer>() {
@Override
public Integer process(Integer f) {
try {
// Wait that many seconds.
Thread.sleep(1000L * f);
} catch (InterruptedException ex) {
// Just give up.
}
// Return it unchanged.
return f;
}
};
System.out.println("Processed");
for (Integer i : Iterables.in(new Mapper<Integer, Integer>(data.iterator(), action))) {
System.out.println(i);
}
System.out.println("Parallel Processed");
for (Integer i : Iterables.in(new ParallelMapper<Integer, Integer>(data.iterator(), action, 2, 2))) {
System.out.println(i);
}
}
Run Code Online (Sandbox Code Playgroud)
注意:Iterables.in(Iterator<T>)只需创建一个Iterable<T>封装传递的的Iterator<T>。
对于有序的线程,您可以处理Pair<Integer,F>并将其PriorityQueue用于线程输出。然后,您可以安排按顺序拉动它们。