Apache Pig:FLATTEN和reducers的并行执行

use*_*640 35 hadoop apache-pig

我已经实现了Apache Pig脚本.当我执行脚本时,它会为特定步骤生成许多映射器,但该步骤只有一个reducer.由于这种情况(许多映射器,一个减速器),Hadoop集群几乎处于空闲状态,而单个减速器执行时.为了更好地使用集群的资源,我希望还有许多并行运行的reducer.

即使我使用SET DEFAULT_PARALLEL命令在Pig脚本中设置并行性,我仍然只有1个reducer.

发出问题的代码部分如下:

SET DEFAULT_PARALLEL 5;
inputData = LOAD 'input_data.txt' AS (group_name:chararray, item:int);
inputDataGrouped = GROUP inputData BY (group_name);
-- The GeneratePairsUDF generates a bag containing pairs of integers, e.g. {(1, 5), (1, 8), ..., (8, 5)}
pairs = FOREACH inputDataGrouped GENERATE GeneratePairsUDF(inputData.item) AS pairs_bag;
pairsFlat = FOREACH pairs GENERATE FLATTEN(pairs_bag) AS (item1:int, item2:int);
Run Code Online (Sandbox Code Playgroud)

'inputData'和'inputDataGrouped'别名在映射器中计算.

减速器中的'pair'和'pairsFlat'.

如果我通过使用FLATTEN命令删除行来更改脚本(pairsFlat = FOREACH对GENERATE FLATTEN(pairs_bag)AS(item1:int,item2:int);)然后执行会产生5个reducers(因此在并行执行中) .

似乎FLATTEN命令是问题,并避免创建许多reducer.

我怎么能达到FLATTEN的相同结果,但是脚本是并行执行的(有很多减速器)?

编辑:

有两个FOREACH时的EXPLAIN计划(如上所述):

Map Plan
inputDataGrouped: Local Rearrange[tuple]{chararray}(false) - scope-32
|   |
|   Project[chararray][0] - scope-33
|
|---inputData: New For Each(false,false)[bag] - scope-29
    |   |
    |   Cast[chararray] - scope-24
    |   |
    |   |---Project[bytearray][0] - scope-23
    |   |
    |   Cast[int] - scope-27
    |   |
    |   |---Project[bytearray][1] - scope-26
    |
    |---inputData: Load(file:///input_data.txt:org.apache.pig.builtin.PigStorage) - scope-22--------


Reduce Plan
pairsFlat: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-42
|
|---pairsFlat: New For Each(true)[bag] - scope-41
    |   |
    |   Project[bag][0] - scope-39
    |
    |---pairs: New For Each(false)[bag] - scope-38
        |   |
        |   POUserFunc(GeneratePairsUDF)[bag] - scope-36
        |   |
        |   |---Project[bag][1] - scope-35
        |       |
        |       |---Project[bag][1] - scope-34
        |
        |---inputDataGrouped: Package[tuple]{chararray} - scope-31--------
Global sort: false
Run Code Online (Sandbox Code Playgroud)

只有一个FOREACH与FLATTEN包装UDF时的EXPLAIN计划:

Map Plan
inputDataGrouped: Local Rearrange[tuple]{chararray}(false) - scope-29
|   |
|   Project[chararray][0] - scope-30
|
|---inputData: New For Each(false,false)[bag] - scope-26
    |   |
    |   Cast[chararray] - scope-21
    |   |
    |   |---Project[bytearray][0] - scope-20
    |   |
    |   Cast[int] - scope-24
    |   |
    |   |---Project[bytearray][1] - scope-23
    |
    |---inputData: Load(file:///input_data.txt:org.apache.pig.builtin.PigStorage) - scope-19--------


Reduce Plan
pairs: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-36
|
|---pairs: New For Each(true)[bag] - scope-35
    |   |
    |   POUserFunc(GeneratePairsUDF)[bag] - scope-33
    |   |
    |   |---Project[bag][1] - scope-32
    |       |
    |       |---Project[bag][1] - scope-31
    |
    |---inputDataGrouped: Package[tuple]{chararray} - scope-28--------
Global sort: false
Run Code Online (Sandbox Code Playgroud)

Tan*_*eer 1

我认为数据存在偏差。只有少数映射器产生指数级的大输出。查看数据中键的分布。类似的数据包含很少的组但有大量的记录。