简单的Java Map/Reduce框架

ska*_*man 49 java mapreduce

任何人都可以指向一个简单的,开源的Map/Reduce框架/ API for Java?似乎没有太多证据证明存在这样的事情,但其他人可能知道不同.

当然,我能找到的最好的是Hadoop MapReduce,但这不符合"简单"标准.我不需要运行分布式作业的能力,只需要让我在一个JVM中使用标准Java5风格的并发在多核机器上运行map/reduce-style作业.

写自己并不难,但我宁愿不必这么做.

cha*_*ory 18

你看看阿卡吗?虽然akka实际上是一个基于分布式Actor模型的并发框架,但只需很少的代码就可以实现很多东西.将工作分成几部分非常容易,它可以自动充分利用多核机器,并能够使用多台机器来处理工作.与使用线程不同,对我来说感觉更自然.

我有一个使用akka 的Java map reduce示例.这不是最简单的地图缩减示例,因为它利用了期货; 但它应该让你大致了解所涉及的内容.我的map reduce示例演示了几个主要内容:

  • 如何划分工作.
  • 如何分配工作:akka有一个非常简单的消息系统,以及一个工作分区,你可以配置你的日程安排.一旦我学会了如何使用它,我就无法停止.它非常简单灵活.我立即使用了所有四个CPU核心.这对于实现服务非常有用.
  • 如何知道工作何时完成以及结果是否已准备好处理:除非您已经熟悉Futures,否则实际上这部分可能是最困难和最容易理解的部分.您不需要使用期货,因为还有其他选择.我只是用它们,因为我想让人们更短暂的东西.

如果您有任何疑问,StackOverflow实际上有一个很棒的akka​​ QA部分.


Luk*_*der 11

我认为值得一提的是,这些问题都是Java 8的历史.例如:

int heaviestBlueBlock =
    blocks.filter(b -> b.getColor() == BLUE)
          .map(Block::getWeight)
          .reduce(0, Integer::max);
Run Code Online (Sandbox Code Playgroud)

换句话说:单节点MapReduce在Java 8中可用.

有关更多详细信息,请参阅Brian Goetz关于项目lambda的演示文稿

  • @skaffman:如果lambda最终没有成功,我会哭的! (4认同)

Pet*_*rey 10

我使用以下结构

int procs = Runtime.getRuntime().availableProcessors();
ExecutorService es = Executors.newFixedThreadPool(procs);

List<Future<TaskResult>> results = new ArrayList();
for(int i=0;i<tasks;i++)
    results.add(es.submit(new Task(i)));
for(Future<TaskResult> future:results)
    reduce(future);
Run Code Online (Sandbox Code Playgroud)

  • @skaffman,你想要比最简单的解决方案更复杂的东西,但比完整的解决方案更简单.金锁解决方案.;)也许你可以说出你的最低要求是什么. (8认同)
  • 嗯......那不是map-reduce,那只是一个赤裸的执行者. (6认同)

Gar*_*vis 8

我意识到这可能是事后的一点,但你可能想看看JDK7 的JSR166y ForkJoin类.

有一个后端移植的库可以在JDK6下工作,没有任何问题,所以你不必等到下一个千年才能使用它.它位于原始执行器和hadoop之间,为当前JVM中的map reduce工作提供框架.


xan*_*xan 6

几年前,当我买了一台8芯机器时,我为自己创造了一次性产品,但我对此并不十分满意.我从来没有像我希望的那样简单地使用它,并且内存密集型任务不能很好地扩展.

如果你没有得到任何真正的答案,我可以分享更多,但它的核心是:

public class LocalMapReduce<TMapInput, TMapOutput, TOutput> {
    private int m_threads;
    private Mapper<TMapInput, TMapOutput> m_mapper;
    private Reducer<TMapOutput, TOutput> m_reducer;
    ...
    public TOutput mapReduce(Iterator<TMapInput> inputIterator) {
        ExecutorService pool = Executors.newFixedThreadPool(m_threads);
        Set<Future<TMapOutput>> futureSet = new HashSet<Future<TMapOutput>>();
        while (inputIterator.hasNext()) {
            TMapInput m = inputIterator.next();
            Future<TMapOutput> f = pool.submit(m_mapper.makeWorker(m));
            futureSet.add(f);
            Thread.sleep(10);
        }
        while (!futureSet.isEmpty()) {
            Thread.sleep(5);
            for (Iterator<Future<TMapOutput>> fit = futureSet.iterator(); fit.hasNext();) {
                Future<TMapOutput> f = fit.next();
                if (f.isDone()) {
                    fit.remove();
                    TMapOutput x = f.get();
                    m_reducer.reduce(x);
                }
            }
        }
        return m_reducer.getResult();
    }
}
Run Code Online (Sandbox Code Playgroud)

编辑:根据评论,下面是没有的版本sleep.诀窍是使用CompletionService哪个基本上提供了已完成Futures 的阻塞队列.

 public class LocalMapReduce<TMapInput, TMapOutput, TOutput> {
    private int m_threads;
    private Mapper<TMapInput, TMapOutput> m_mapper;
    private Reducer<TMapOutput, TOutput> m_reducer;
    ...
    public TOutput mapReduce(Collection<TMapInput> input) {
        ExecutorService pool = Executors.newFixedThreadPool(m_threads);
        CompletionService<TMapOutput> futurePool = 
                  new ExecutorCompletionService<TMapOutput>(pool);
        Set<Future<TMapOutput>> futureSet = new HashSet<Future<TMapOutput>>();
        for (TMapInput m : input) {
            futureSet.add(futurePool.submit(m_mapper.makeWorker(m)));
        }
        pool.shutdown();
        int n = futureSet.size();
        for (int i = 0; i < n; i++) {
            m_reducer.reduce(futurePool.take().get());
        }
        return m_reducer.getResult();
    }
Run Code Online (Sandbox Code Playgroud)

我还会注意到这是一个非常精简的map-reduce算法,包括一个reduce worker,它同时执行reduce和merge操作.


Flo*_*ilz 5

我喜欢在Java中使用Skandium进行并行化.该框架为具有共享内存的多核机器实现了某些并行模式(即Master-Slave,Map/Reduce,Pipe,Fork和Divide&Conquer).这种技术被称为"算法骨架".模式可以嵌套.

详细说,有骷髅和肌肉.肌肉做实际的工作(分裂,合并,执行和条件).骷髅代表并行度的模式,除了"While","For"和"If",这在嵌套模式时很有用.

可以在框架内找到示例.我需要一点了解如何使用肌肉和骨骼,但在克服了这个障碍之后我真的很喜欢这个框架.:)