我对 Map/Reduce 原则和 python mrjob 框架非常陌生,我编写了这个示例代码,它运行良好,但我想知道我可以对其进行哪些更改以使其“完美”/更高效。
from mrjob.job import MRJob
import operator
import re
# append result from each reducer
output_words = []
class MRSudo(MRJob):
def init_mapper(self):
# move list of tuples across mapper
self.words = []
def mapper(self, _, line):
command = line.split()[-1]
self.words.append((command, 1))
def final_mapper(self):
for word_pair in self.words:
yield word_pair
def reducer(self, command, count):
# append tuples to the list
output_words.append((command, sum(count)))
def final_reducer(self):
# Sort tuples in the list by occurence
map(operator.itemgetter(1), output_words)
sorted_words = …Run Code Online (Sandbox Code Playgroud) 我在我的笔记本电脑上设置了伪分布式 hadoop 2.2.0 环境。我可以运行 mapreduce 应用程序(包括 Pig 和 Hive 作业),并且可以从 Web UI 看到应用程序的状态 http://localhost:8088
我已经下载了 Spark 库,并且只是将文件系统-HDFS 用于 Spark 应用程序。当我启动一个 Spark 应用程序时,它正在启动并且执行也按预期成功完成。
但是 Web UIhttp://localhost:8088没有列出已完成/启动的 Spark 应用程序。
请建议是否需要任何其他额外配置才能在 Web UI 中查看 Spark 应用程序。
(注意:http://localhost:50070当尝试通过 Spark 应用程序将文件写入 HDFS 时,此 Web UI 会正确显示文件)
我在我的收藏中有这些文件:
{_id: "aaaaaaaa", email: "mail1@orange.fr"},
{_id: "bbbbbbbb", email: "mail2@orange.fr"},
{_id: "cccccccc", email: "mail3@orange.fr"},
{_id: "dddddddd", email: "mail4@gmail.com"},
{_id: "eeeeeeee", email: "mail5@gmail.com"},
{_id: "ffffffff", email: "mail6@yahoo.com"}
Run Code Online (Sandbox Code Playgroud)
我希望这个结果:
{
result: [
{domain: "orange.fr", count: 3},
{domain: "gmail.com", count: 2},
{domain: "yahoo.com", count: 1},
]
}
Run Code Online (Sandbox Code Playgroud)
我不确定你可以使用聚合器和$ regex运算符
我为以下数据结构创建了MapReduce作业:
{ "_id" : 1), "docid" : 119428, "term" : 5068, "score" : 0.198 }
{ "_id" : 2), "docid" : 154690, "term" : 5068, "score" : 0.21 }
{ "_id" : 3), "docid" : 156278, "term" : 5068, "score" : 0.128 }
{ "_id" : 4), "docid" : 700, "term" : "fire", "score" : 0.058 }
{ "_id" : 5), "docid" : 857, "term" : "fire", "score" : 0.133 }
{ "_id" : 6), "docid" : 900, "term" : …Run Code Online (Sandbox Code Playgroud) 我很困惑,因为我找到了两个答案.
1)根据Hadoop权威指南 - 第3版,第6章 - Map Side说:"在写入磁盘之前,线程首先将数据划分为与最终将被发送到的reducer相对应的分区.在每个分区中,后台线程按键执行内存中排序,如果有组合器功能,则在排序输出上运行.
2)雅虎开发人员教程(雅虎教程)称Combiner在分区之前运行.
任何人都可以先澄清哪个运行.
我目前正在研究GeoSpatial项目,我使用MongoDB作为数据库,Meteor用于创建我的应用程序.我在MongoDB中对空间数据运行了一些查询(使用mapReduce),我想把这个代码(查询)放在Meteor(javascript文件)上.我做了一些研究但仍然有问题.我不知道如何在流星中编写mapReduce函数.
这是我在MongoDB中的mapReduce代码:
var map1 = function() {
var px =-83.215;
var py =41.53;
if(this.geometry.minlon<=px && px<=this.geometry.maxlon && this.geometry.minlat<=py && py<=this.geometry.maxlat)
{emit(this._id, 1);}
}
var reduce1 = function(key, value) {
return Array.sum(value)
}
db.C1DB.mapReduce(map1, reduce1, {
out: "CollectionName"
})
Run Code Online (Sandbox Code Playgroud) 当我编写mapreduce程序时,我经常编写代码
job1.setMapOutputKeyClass(Text.class);
Run Code Online (Sandbox Code Playgroud)
但是为什么我们要明确指定MapOutputKeyClass呢?我们已经在mapper类中进行了spicify,例如
public static class MyMapper extends
Mapper<LongWritable, Text, Text, Text>
Run Code Online (Sandbox Code Playgroud)
在Hadoop:权威指南一书中,有一个表显示方法setMapOutputKeyClass是可选的(属性用于配置类型),但是在我测试时,我发现它是必要的,或者eclipse的控制台会显示
Type mismatch in key from map: expected org.apache.hadoop.io.LongWritable, received org.apache.hadoop.io.Text
Run Code Online (Sandbox Code Playgroud)
有人能告诉我它的原因吗?
在书中,它说
"必须与MapReduce类型兼容的设置列于表8-1的下半部分".这是否意味着我们必须设置下部属性类型,但不必设置较高部分属性类型?
表的内容如下所示:
Properties for configuring types:
mapreduce.job.inputformat.class
mapreduce.map.output.key.class
mapreduce.map.output.value.class
mapreduce.job.output.key.class
mapreduce.job.output.value.class
Properties that must be consistent with the types:
mapreduce.job.map.class
mapreduce.job.combine.class
mapreduce.job.partitioner.class
mapreduce.job.output.key.comparator.class
mapreduce.job.output.group.comparator.class
mapreduce.job.reduce.class
mapreduce.job.outputformat.class
Run Code Online (Sandbox Code Playgroud) 假设我有复合数组用于创建视图.我想用相同的第一个键查询所有对象.所以我用?startKey=["someKey"]
但是我可以使用的最高端键是什么?
我可能会用&endKey=["someKey",{}].但会{}比大{ someOtherKey:{} }吗?
如果没有那么我可以使用什么?
我需要在HDFS上存储大约10TB的大文件.我需要了解的是HDFS将如何存储此文件.比如,群集的复制因子是3,我有一个10节点群集,每个节点上有超过10 TB的磁盘空间,即总群集容量超过100TB.
现在,HDFS随机选择三个节点并将文件存储在这三个节点上.那么这就像听起来一样简单.请确认?
或者HDFS将文件分割 - 比如说每个10T分割1TB,然后将每个分割存储在随机选择的3个节点上.分裂是可能的,如果是,则是启用它的配置方面.如果HDFS必须拆分二进制文件或文本文件 - 它是如何拆分的.只需按字节.
我有一个映射步骤,其中我并行渲染图像的大量扇区:
1 2
3 4
worker a -> 1
worker b -> 2
...
merge 1,2,3,4 to make final image
Run Code Online (Sandbox Code Playgroud)
对于相对较小且可以放入RAM的图像,可以简单地使用PIL的功能:
def merge_images(image_files, x, y):
images = map(Image.open, image_files)
width, height = images[0].size
new_im = Image.new('RGB', (width * x, height * y))
for n, im in enumerate(images):
new_im.paste(im, ((n%x) * width, (n//y) * height))
return new_im
Run Code Online (Sandbox Code Playgroud)
不幸的是,我将有很多很多大部门。我想将这些图片最终合并成一个约40,000 x 60,000像素的单张图像,估计约为20 GB。(甚至更多)
因此,显然,我们无法在RAM上解决此问题。我知道还有其他选择,例如memmap'数组和写入切片,我会尝试的。但是,我正在寻找可能的解决方案。
有谁知道更简单的选择吗?即使到目前为止我尝试过的所有方法都在python中,但也不必在python中。