决定对给定数据集使用映射器和缩减器数量以实现最佳性能的因素有哪些?我说的是 Apache Hadoop MapReduce 平台。
我正在运行教程 http://ampcamp.berkeley.edu/big-data-mini-course/graph-analytics-with-graphx.html
在某些时候我们会使用mapReduceTriplets 操作。这将返回预期结果
// Find the oldest follower for each user
val oldestFollower: VertexRDD[(String, Int)] = userGraph.mapReduceTriplets[(String, Int)](
// For each edge send a message to the destination vertex with the attribute of the source vertex
edge => Iterator((edge.dstId, (edge.srcAttr.name, edge.srcAttr.age))),
// To combine messages take the message for the older follower
(a, b) => if (a._2 > b._2) a else b
)
Run Code Online (Sandbox Code Playgroud)
但 IntelliJ 指出 mapReduceTriplets 已被弃用(从 1.2.0 开始),应该由aggregateMessages 替换
// Find the oldest follower for each user …Run Code Online (Sandbox Code Playgroud) 我正在对 Hadoop 框架进行一些研究。我想问一下框架中可以使用的属性..例如
有什么参考资料可以让我了解这个框架的整个属性列表吗?非常希望有人可以帮助我。
另外我想问一下,io.sort.mb和mapreduce.task.io.sort.mb有什么区别吗?或者他们只是一样?
YARN是Hadoop第二代,不再使用jobtracker守护进程,而是用资源管理器代替。但是为什么在mapred-site.xml hadoop 2上有一个mapreduce.jobtracker.address属性呢?
我正在运行一个地图缩减程序来读取 HDFS 文件,如下所示:
hadoop jar /opt/mapr/hadoop/hadoop-0.20.2/contrib/streaming/hadoop-0.20.2-dev-streaming.jar -Dmapred.reduce.tasks=1000 -file $homedir/mapper.py -mapper $homedir/mapper.py -file $homedir/reducer.py -reducer $homedir/reducer.py -input /user/data/* -output /output/ 2> output.text
Run Code Online (Sandbox Code Playgroud)
有什么需要确认的,路径 /user/data/* 包含包含文件的文件夹, /user/data/* 将迭代所有子文件夹下的所有文件,对吗?
hdfs 文本文件的每一行都包含一个 JSON 字符串,因此映射器读取该文件如下:
for line in sys.stdin:
try:
object = json.loads(line)
Run Code Online (Sandbox Code Playgroud)
但 HDFS 的所有者将文件从文本更改为序列文件。我发现mapreduce程序输出了很多零大小的文件,这可能意味着它没有成功从HDFS读取文件。
我应该对代码进行哪些更改才能从序列文件中读取内容?我还有一个 HIVE 外部表来根据 mapreduce 的输出执行聚合和排序,并且 HIVE 之前是 STORED AS TEXTFILE ,我应该更改为 STORED AS SEQUENCEFILE 吗?
谢谢,
val drdd = Seq(("a", 1), ("b", 2), ("a", 3)).toDF("name", "value").toJavaRDD
drdd.map{ (row: Row) => row.get(0) }
Run Code Online (Sandbox Code Playgroud)
看起来我传递的匿名函数是 Row => Any 而它期待 org.apache.spark.api.java.function.Function[org.apache.spark.sql.Row,?]
<console>:35: error: type mismatch;
found : org.apache.spark.sql.Row => Any
required: org.apache.spark.api.java.function.Function[org.apache.spark.sql.Row,?]
drdd.map{ (row: Row) => row.get(0) }
^
Run Code Online (Sandbox Code Playgroud)
这些函数类型之间有什么区别,我应该如何构造它?谢谢!
我正在学习 Hadoop 并知道该框架有两个版本,即:Hadoop1 和 Hadoop2。
如果我的理解是正确的,在 Hadoop1 中,执行环境基于两个守护进程即TaskTracker,JobTracker而在 Hadoop2(又名 yarn)中,执行环境基于“新守护进程”即ResourceManager, NodeManager, ApplicationMaster.
如果这不正确,请纠正我。
mapreduce.framework.name
可以采用的可能值:local,classic,yarn
我不明白它们到底是什么意思;例如,如果我安装 Hadoop 2 ,那么它怎么会有旧的执行环境(有TaskTracker, JobTracker)。
谁能帮助我这些值是什么意思?
我是 python 新手,正在尝试处理大数据代码,但无法理解表达式re.compile(r"[\w']+") 的含义。有人对此有任何想法吗?
这是我使用的代码。
from mrjob.job import MRJob
import re
WORD_REGEXP = re.compile(r"[\w']+")
class MRWordFrequencyCount(MRJob):
def mapper(self, _, line):
words = WORD_REGEXP.findall(line)
for word in words:
yield word.lower(), 1
def reducer(self, key, values):
yield key, sum(values)
if __name__ == '__main__':
MRWordFrequencyCount.run()
Run Code Online (Sandbox Code Playgroud) 我有一个包含呼叫数据记录(CDR)的配置单元表。我在电话号码上对表进行了分区,并在 call_date 上进行了存储。现在,当我将数据插入配置单元时,过时的 call_date 正在我的存储桶中创建小文件,这导致名称节点元数据增加和性能下降。有没有办法将这些小文件合并成一个。
我想创建一个接收任意数量参数并根据数量取平均值的函数。论据。所以我尝试了如下代码;
function takeAvg(...nums) {
return nums.reduce((total, curVal) => {
return (total + curVal) / nums.length;
});
}
console.log(takeAvg(10, 20)) // returns 15 - no problem
console.log(takeAvg(10, 20, 30)) // returns 13.333333333333334 weirdly.Run Code Online (Sandbox Code Playgroud)
我认为问题出在 nums.legth 上,但我不明白为什么?请帮忙。谢谢。
mapreduce ×10
hadoop ×6
apache-spark ×2
bigdata ×2
hadoop-yarn ×2
hdfs ×2
hive ×2
java ×2
python ×2
average ×1
dictionary ×1
hadoop2 ×1
javascript ×1
jobs ×1
mrv2 ×1
python-2.7 ×1
rdd ×1
scala ×1
sequencefile ×1
spark-graphx ×1
xml ×1