任何人都可以指向一个简单的,开源的Map/Reduce框架/ API for Java?似乎没有太多证据证明存在这样的事情,但其他人可能知道不同.
当然,我能找到的最好的是Hadoop MapReduce,但这不符合"简单"标准.我不需要运行分布式作业的能力,只需要让我在一个JVM中使用标准Java5风格的并发在多核机器上运行map/reduce-style作业.
写自己并不难,但我宁愿不必这么做.
cha*_*ory 18
你看看阿卡吗?虽然akka实际上是一个基于分布式Actor模型的并发框架,但只需很少的代码就可以实现很多东西.将工作分成几部分非常容易,它可以自动充分利用多核机器,并能够使用多台机器来处理工作.与使用线程不同,对我来说感觉更自然.
我有一个使用akka 的Java map reduce示例.这不是最简单的地图缩减示例,因为它利用了期货; 但它应该让你大致了解所涉及的内容.我的map reduce示例演示了几个主要内容:
如果您有任何疑问,StackOverflow实际上有一个很棒的akka QA部分.
Luk*_*der 11
我认为值得一提的是,这些问题都是Java 8的历史.例如:
int heaviestBlueBlock =
blocks.filter(b -> b.getColor() == BLUE)
.map(Block::getWeight)
.reduce(0, Integer::max);
Run Code Online (Sandbox Code Playgroud)
换句话说:单节点MapReduce在Java 8中可用.
有关更多详细信息,请参阅Brian Goetz关于项目lambda的演示文稿
Pet*_*rey 10
我使用以下结构
int procs = Runtime.getRuntime().availableProcessors();
ExecutorService es = Executors.newFixedThreadPool(procs);
List<Future<TaskResult>> results = new ArrayList();
for(int i=0;i<tasks;i++)
results.add(es.submit(new Task(i)));
for(Future<TaskResult> future:results)
reduce(future);
Run Code Online (Sandbox Code Playgroud)
我意识到这可能是事后的一点,但你可能想看看JDK7 的JSR166y ForkJoin类.
有一个后端移植的库可以在JDK6下工作,没有任何问题,所以你不必等到下一个千年才能使用它.它位于原始执行器和hadoop之间,为当前JVM中的map reduce工作提供框架.
几年前,当我买了一台8芯机器时,我为自己创造了一次性产品,但我对此并不十分满意.我从来没有像我希望的那样简单地使用它,并且内存密集型任务不能很好地扩展.
如果你没有得到任何真正的答案,我可以分享更多,但它的核心是:
public class LocalMapReduce<TMapInput, TMapOutput, TOutput> {
private int m_threads;
private Mapper<TMapInput, TMapOutput> m_mapper;
private Reducer<TMapOutput, TOutput> m_reducer;
...
public TOutput mapReduce(Iterator<TMapInput> inputIterator) {
ExecutorService pool = Executors.newFixedThreadPool(m_threads);
Set<Future<TMapOutput>> futureSet = new HashSet<Future<TMapOutput>>();
while (inputIterator.hasNext()) {
TMapInput m = inputIterator.next();
Future<TMapOutput> f = pool.submit(m_mapper.makeWorker(m));
futureSet.add(f);
Thread.sleep(10);
}
while (!futureSet.isEmpty()) {
Thread.sleep(5);
for (Iterator<Future<TMapOutput>> fit = futureSet.iterator(); fit.hasNext();) {
Future<TMapOutput> f = fit.next();
if (f.isDone()) {
fit.remove();
TMapOutput x = f.get();
m_reducer.reduce(x);
}
}
}
return m_reducer.getResult();
}
}
Run Code Online (Sandbox Code Playgroud)
编辑:根据评论,下面是没有的版本sleep.诀窍是使用CompletionService哪个基本上提供了已完成Futures 的阻塞队列.
public class LocalMapReduce<TMapInput, TMapOutput, TOutput> {
private int m_threads;
private Mapper<TMapInput, TMapOutput> m_mapper;
private Reducer<TMapOutput, TOutput> m_reducer;
...
public TOutput mapReduce(Collection<TMapInput> input) {
ExecutorService pool = Executors.newFixedThreadPool(m_threads);
CompletionService<TMapOutput> futurePool =
new ExecutorCompletionService<TMapOutput>(pool);
Set<Future<TMapOutput>> futureSet = new HashSet<Future<TMapOutput>>();
for (TMapInput m : input) {
futureSet.add(futurePool.submit(m_mapper.makeWorker(m)));
}
pool.shutdown();
int n = futureSet.size();
for (int i = 0; i < n; i++) {
m_reducer.reduce(futurePool.take().get());
}
return m_reducer.getResult();
}
Run Code Online (Sandbox Code Playgroud)
我还会注意到这是一个非常精简的map-reduce算法,包括一个reduce worker,它同时执行reduce和merge操作.
| 归档时间: |
|
| 查看次数: |
31523 次 |
| 最近记录: |