Python MapReduce Hadoop Streaming Job需要多个输入文件?

Com*_*low 6 python hadoop mapreduce hadoop-streaming

我的群集中有两个文件,File AFile B带有以下数据 -

档案A.

#Format: 
#Food Item | Is_A_Fruit (BOOL)

Orange | Yes
Pineapple | Yes
Cucumber | No
Carrot | No
Mango | Yes
Run Code Online (Sandbox Code Playgroud)

档案B.

#Format:
#Food Item | Vendor Name

Orange | Vendor A
Pineapple | Vendor B
Cucumber | Vendor B
Carrot | Vendor B
Mango | Vendor A
Run Code Online (Sandbox Code Playgroud)

基本上我想知道 每个供应商销售多少水果?

预期产量:

Vendor A | 2
Vendor B | 1
Run Code Online (Sandbox Code Playgroud)

我需要使用hadoop流式传输python map reduce.

我已经阅读了如何做一个基本的单词计数,我读取sys.stdink,v为减速器发出对然后减少.

我该如何处理这个问题?

我主要关注的是如何读取多个文件,然后在Hadoop Streaming中进行比较.

我可以在普通的python中做到这一点(即没有MapReduce和Hadoop,它很简单.)但是对于我拥有的庞大数据大小它是不可行的.

cab*_*bad 6

文件A真的那么大吗?我会把它放在DistributedCache中并从那里读取它.要将其放入分布式缓存中,请在Hadoop流式调用中使用此选项:

-cacheFile 'hdfs://namenode:port/the/hdfs/path/to/FileA#FileA'
Run Code Online (Sandbox Code Playgroud)

(我想以下应该也可以,但我还没试过:)

-cacheFile '/the/hdfs/path/to/FileA#FileA'
Run Code Online (Sandbox Code Playgroud)

请注意,这#fileA是您用于使映射器可以使用该文件的名称.

然后,在你的映射器中,你将从中读取FileB sys.stdin(假设你使用了Hadoop Streaming -input '/user/foo/FileB'),并且要读取FileA,你应该这样做:

f = open('FileA', 'r')
...
f.readline()
Run Code Online (Sandbox Code Playgroud)

现在,我想你已经想到了这一点,但对我来说,有一个像这样的映射器是有道理的:

  1. 打开FileA
  2. 逐行读取FileA(在循环中)并将其加载到地图中,以便您可以轻松查找键并找到其值(是,否).
  3. 从stdin读取主循环.在循环内,对于每一行(在FileB中),检查您的地图(参见步骤2)以确定您是否有水果......等.


Vis*_*hal 4

请看一下这个示例,因为它与您正在寻找的内容非常直接相关。

  • +1。最初我有点怀疑这个例子是否真的按照建议的那样工作,因为作者隐藏了重要的部分,例如,分离分区和排序,这是通过“-jobconf stream.num.map.output.key.fields=4”完成的-jobconf map.output.key.field.separator=^ -jobconf num.key.fields.for.partition=1”。 (4认同)