Com*_*low 6 python hadoop mapreduce hadoop-streaming
我的群集中有两个文件,File A并File B带有以下数据 -
档案A.
#Format:
#Food Item | Is_A_Fruit (BOOL)
Orange | Yes
Pineapple | Yes
Cucumber | No
Carrot | No
Mango | Yes
Run Code Online (Sandbox Code Playgroud)
档案B.
#Format:
#Food Item | Vendor Name
Orange | Vendor A
Pineapple | Vendor B
Cucumber | Vendor B
Carrot | Vendor B
Mango | Vendor A
Run Code Online (Sandbox Code Playgroud)
基本上我想知道 每个供应商销售多少水果?
预期产量:
Vendor A | 2
Vendor B | 1
Run Code Online (Sandbox Code Playgroud)
我需要使用hadoop流式传输python map reduce.
我已经阅读了如何做一个基本的单词计数,我读取sys.stdin并k,v为减速器发出对然后减少.
我该如何处理这个问题?
我主要关注的是如何读取多个文件,然后在Hadoop Streaming中进行比较.
我可以在普通的python中做到这一点(即没有MapReduce和Hadoop,它很简单.)但是对于我拥有的庞大数据大小它是不可行的.
文件A真的那么大吗?我会把它放在DistributedCache中并从那里读取它.要将其放入分布式缓存中,请在Hadoop流式调用中使用此选项:
-cacheFile 'hdfs://namenode:port/the/hdfs/path/to/FileA#FileA'
Run Code Online (Sandbox Code Playgroud)
(我想以下应该也可以,但我还没试过:)
-cacheFile '/the/hdfs/path/to/FileA#FileA'
Run Code Online (Sandbox Code Playgroud)
请注意,这#fileA是您用于使映射器可以使用该文件的名称.
然后,在你的映射器中,你将从中读取FileB sys.stdin(假设你使用了Hadoop Streaming -input '/user/foo/FileB'),并且要读取FileA,你应该这样做:
f = open('FileA', 'r')
...
f.readline()
Run Code Online (Sandbox Code Playgroud)
现在,我想你已经想到了这一点,但对我来说,有一个像这样的映射器是有道理的: