用Java编写多线程映射迭代器

Mic*_*Kay 12 java multithreading

我有一个通用的映射迭代器:类似这样的东西:

class Mapper<F, T> implements Iterator<T> {

  private Iterator<F> input;
  private Action<F, T> action;

  public Mapper(input, action) {...}

  public boolean hasNext() {
    return input.hasNext();
  }

  public T next() {
    return action.process(input.next());
  }
}
Run Code Online (Sandbox Code Playgroud)

现在,假设action.process()可能非常耗时,我希望通过使用多个线程并行处理输入项来获得性能.我想分配一个N个工作线程池,并将项目分配给这些线程进行处理.这应该在"幕后"发生,因此客户端代码只能看到Iterator.代码应避免将输入或输出序列保存在内存中.

为了添加一个扭曲,我想要两个版本的解决方案,一个保留订单(最终迭代器以与输入迭代器相同的顺序交付项目),其中一个不一定保留订单(每个输出项目尽快交付)它是可用的).

我有点工作,但代码似乎令人费解和不可靠,我不相信它正在使用最佳实践.

有关最简单,最可靠的实施方法的建议吗?我正在寻找适用于JDK 6的东西,我想尽可能避免在外部库/框架上引入依赖.

Old*_*eon 5

我将使用线程池作为线程,并BlockingQueue从池中提供数据。

这似乎适用于我的简单测试用例。

interface Action<F, T> {

    public T process(F f);

}

class Mapper<F, T> implements Iterator<T> {

    protected final Iterator<F> input;
    protected final Action<F, T> action;

    public Mapper(Iterator<F> input, Action<F, T> action) {
        this.input = input;
        this.action = action;
    }

    @Override
    public boolean hasNext() {
        return input.hasNext();
    }

    @Override
    public T next() {
        return action.process(input.next());
    }
}

class ParallelMapper<F, T> extends Mapper<F, T> {

    // The pool.
    final ExecutorService pool;
    // The queue.
    final BlockingQueue<T> queue;
    // The next one to deliver.
    private T next = null;

    public ParallelMapper(Iterator<F> input, Action<F, T> action, int threads, int queueLength) {
        super(input, action);
        // Start my pool.
        pool = Executors.newFixedThreadPool(threads);
        // And the queue.
        queue = new ArrayBlockingQueue<>(queueLength);
    }

    class Worker implements Runnable {

        final F f;
        private T t;

        public Worker(F f) {
            this.f = f;
        }

        @Override
        public void run() {
            try {
                queue.put(action.process(f));
            } catch (InterruptedException ex) {
                // Not sure what you can do here.
            }
        }

    }

    @Override
    public boolean hasNext() {
        // All done if delivered it and the input is empty and the queue is empty and the threads are finished.
        while (next == null && (input.hasNext() || !queue.isEmpty() || !pool.isTerminated())) {
            // First look in the queue.
            next = queue.poll();
            if (next == null) {
                // Queue empty.
                if (input.hasNext()) {
                    // Start a new worker.
                    pool.execute(new Worker(input.next()));
                }
            } else {
                // Input exhausted - shut down the pool - unless we already have.
                if (!pool.isShutdown()) {
                    pool.shutdown();
                }
            }
        }
        return next != null;
    }

    @Override
    public T next() {
        T n = next;
        if (n != null) {
            // Delivered that one.
            next = null;
        } else {
            // Fails.
            throw new NoSuchElementException();
        }
        return n;
    }
}

public void test() {
    List<Integer> data = Arrays.asList(5, 4, 3, 2, 1, 0);
    System.out.println("Data");
    for (Integer i : Iterables.in(data)) {
        System.out.println(i);
    }
    Action<Integer, Integer> action = new Action<Integer, Integer>() {

        @Override
        public Integer process(Integer f) {
            try {
                // Wait that many seconds.
                Thread.sleep(1000L * f);
            } catch (InterruptedException ex) {
                // Just give up.
            }
            // Return it unchanged.
            return f;
        }

    };
    System.out.println("Processed");
    for (Integer i : Iterables.in(new Mapper<Integer, Integer>(data.iterator(), action))) {
        System.out.println(i);
    }
    System.out.println("Parallel Processed");
    for (Integer i : Iterables.in(new ParallelMapper<Integer, Integer>(data.iterator(), action, 2, 2))) {
        System.out.println(i);
    }

}
Run Code Online (Sandbox Code Playgroud)

注意:Iterables.in(Iterator<T>)只需创建一个Iterable<T>封装传递的的Iterator<T>

对于有序的线程,您可以处理Pair<Integer,F>并将其PriorityQueue用于线程输出。然后,您可以安排按顺序拉动它们。