Hive cluster by vs order by vs sort by

cas*_*ere 51 hadoop hive hql

据我所理解;

  • 只在reducer中排序

  • 按订单排序全球,但将所有东西都推到一个减速器中

  • 通过密钥散列智能地将东西分配到reducers中并进行排序

所以我的问题是集群保证全球秩序?分配通过将相同的密钥放入相同的减速器但是相邻的密钥呢?

我能在这里找到的唯一文件就是这里,从示例中可以看出它是全局命令的.但从定义来看,我觉得并不总是如此.

Lar*_*ken 146

一个较短的答案:是的,CLUSTER BY保证全局排序,只要您愿意自己加入多个输出文件.

版本较长:

  • ORDER BY x:保证全局排序,但通过仅通过一个reducer推送所有数据来实现此目的.这对于大型数据集来说基本上是不可接受的.您最终将一个已排序的文件作为输出.
  • SORT BY x:在N个减速器中的每个减少器处订购数据,但每个减速器可以接收重叠的数据范围.您最终得到N个或更多具有重叠范围的排序文件.
  • DISTRIBUTE BY x:确保每个N减速器都得到非重叠范围x,但不对每个减速器的输出进行排序.您最终会得到N个或更多未分类的文件,这些文件具有不重叠的范围.
  • CLUSTER BY x:确保N个减速器中的每一个都获得非重叠范围,然后按减速器的那些范围进行排序.这为您提供了全局排序,与执行(DISTRIBUTE BY xSORT BY x)相同.您最终得到N个或更多具有不重叠范围的排序文件.

合理?所以CLUSTER BY基本上是更具可扩展性的版本ORDER BY.

  • 正如其他答案所述,根据https://cwiki.apache.org/confluence/display/Hive/LanguageManual+SortBy,`CLUSTER BY`和`DISTRIBUTE BY`不能给你不重叠的范围.`CLUSTER BY`不能保证全球订购. (8认同)
  • 我的查询为`SORT BY`和`CLUSTER BY`返回相同的不需要的东西:reducers中的本地排序.我不得不求助于"ORDER BY"并等待整个周末,直到工作完成. (2认同)

Yev*_*kiy 13

首先让我澄清一下:clustered by只将密钥分配到不同的存储桶中,clustered by ... sorted by然后对存储桶进行排序.

通过简单的实验(见下文),您可以看到默认情况下您不会获得全局订单.原因是默认分区程序使用哈希码分割键,而不管实际的键排序.

但是,您可以完全订购数据.

动机是汤姆怀特的"Hadoop:The Definitive Guide"(第3版,第8章,第274页,总排序),他讨论了TotalOrderPartitioner.

我将首先回答你的TotalOrdering问题,然后描述我做过的几个与排序相关的Hive实验.

请记住:我在这里描述的是"概念证明",我能够使用Claudera的CDH3发行版处理一个例子.

最初我希望org.apache.hadoop.mapred.lib.TotalOrderPartitioner能够做到这一点.不幸的是它没有,因为它看起来像Hive分区的价值,而不是关键.所以我修补它(应该有子类,但我没有时间):

更换

public int getPartition(K key, V value, int numPartitions) {
    return partitions.findPartition(key);
}
Run Code Online (Sandbox Code Playgroud)

public int getPartition(K key, V value, int numPartitions) {
    return partitions.findPartition(value);
}
Run Code Online (Sandbox Code Playgroud)

现在您可以将(修补)TotalOrderPartitioner设置为您的Hive分区程序:

hive> set hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner;

hive> set total.order.partitioner.natural.order=false

hive> set total.order.partitioner.path=/user/yevgen/out_data2
Run Code Online (Sandbox Code Playgroud)

我也用过

hive> set hive.enforce.bucketing = true; 

hive> set mapred.reduce.tasks=4;
Run Code Online (Sandbox Code Playgroud)

在我的测试中.

文件out_data2告诉TotalOrderPartitioner如何存储值.您可以通过对数据进行采样来生成out_data2.在我的测试中,我使用了4个桶和0到10的密钥.我使用ad-hoc方法生成了out_data2:

import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.fs.FileSystem;


public class TotalPartitioner extends Configured implements Tool{
    public static void main(String[] args) throws Exception{
            ToolRunner.run(new TotalPartitioner(), args);
    }

    @Override
    public int run(String[] args) throws Exception {
        Path partFile = new Path("/home/yevgen/out_data2");
        FileSystem fs = FileSystem.getLocal(getConf());

        HiveKey key = new HiveKey();
        NullWritable value = NullWritable.get();

        SequenceFile.Writer writer = SequenceFile.createWriter(fs, getConf(), partFile, HiveKey.class, NullWritable.class);
        key.set( new byte[]{1,3}, 0, 2);//partition at 3; 1 came from Hive -- do not know why
        writer.append(key, value);
        key.set( new byte[]{1, 6}, 0, 2);//partition at 6
        writer.append(key, value);
        key.set( new byte[]{1, 9}, 0, 2);//partition at 9
        writer.append(key, value);
        writer.close();
        return 0;
    }

}
Run Code Online (Sandbox Code Playgroud)

然后我将结果out_data2复制到HDFS(到/ user/yevgen/out_data2)

通过这些设置,我得到了我的数据分段/排序(请参阅我的实验列表中的最后一项).

这是我的实验.

  • 创建样本数据

    bash> echo -e"1 \n3 \n2 \n4 \n5 \n7 \n6 \n8 \n9 \n0"> data.txt

  • 创建基本测试表:

    hive> create table test(x int); hive>将数据本地inpath'data.txt'加载到表测试中;

基本上,此表包含0到9之间没有顺序的值.

  • 演示表复制的工作原理(实际上是mapred.reduce.tasks参数,用于设置要使用的最大数量的reduce任务)

    hive> create table test2(x int);

    hive> set mapred.reduce.tasks = 4;

    hive> insert overwrite table test2 select ax from test a join test b on ax = bx; - 愚蠢的联合迫使非平凡的map-reduce

    bash> hadoop fs -cat/user/hive/warehouse/test2/000001_0

    1

    9

  • 展示喧嚣.您可以看到密钥是随机分配的,没有任何排序顺序:

    hive> create table test3(x int)由(x)聚集成4个桶;

    hive> set hive.enforce.bucketing = true;

    hive> insert overwrite table test3 select*from test;

    bash> hadoop fs -cat/user/hive/warehouse/test3/000000_0

    4

    8

    0

  • 用分拣打包.结果部分排序,未完全排序

    hive> create table test4(x int)由(x)聚类,由(x desc)排序为4个桶;

    hive> insert overwrite table test4 select*from test;

    bash> hadoop fs -cat/user/hive/warehouse/test4/000001_0

    1

    9

您可以看到值按升序排序.看起来像CDH3中的Hive bug?

  • 不使用cluster by语句进行部分排序:

    hive> create table test5 as select x from test distribution by x sort by x desc;

    bash> hadoop fs -cat/user/hive/warehouse/test5/000001_0

    9

    1

  • 使用我修补过的TotalOrderParitioner:

    hive> set hive.mapred.partitioner = org.apache.hadoop.mapred.lib.TotalOrderPartitioner;

    hive> set total.order.partitioner.natural.order = false

    hive> set total.order.partitioner.path =/user/training/out_data2

    hive> create table test6(x int)由(x)聚类,按(x)排序为4个桶;

    hive> insert overwrite table test6 select*from test;

    bash> hadoop fs -cat/user/hive/warehouse/test6/000000_0

    1

    2

    0

    bash> hadoop fs -cat/user/hive/warehouse/test6/000001_0

    3

    4

    bash> hadoop fs -cat/user/hive/warehouse/test6/000002_0

    7

    6

    8

    bash> hadoop fs -cat/user/hive/warehouse/test6/000003_0

    9


Edi*_*ice 6

CLUSTER BY不产生全局排序。

接受的答案(Lars Yencken的说法)误导了减速器将接收不重叠的范围。正如Anton Zaviriukhin正确指向BucketedTables文档一样,CLUSTER BY基本上是每个存储桶/还原器中的DISTRIBUTE BY(与存储桶相同)加上SORT BY。并且DISTRIBUTE BY只是将哈希和mod散列到存储桶中,而哈希函数可以保留顺序(如果i> j,则i的哈希值> j的哈希值),而哈希值的mod不会。

这是显示重叠范围的更好示例

http://myitlearnings.com/bucketing-in-hive/


小智 5

据我了解,简短的回答是否定的。你会得到重叠的范围。

来自SortBy 文档:“Cluster By 是 Distribute By 和 Sort By 的捷径。” “具有相同 Distribute By 列的所有行都将进入相同的减速器。” 但是没有保证非重叠范围分布的信息。

此外,来自DDL BucketedTables 文档:“Hive 如何跨存储桶分配行?通常,存储桶编号由表达式 hash_function(bucketing_column) mod num_buckets 确定。” 我想 Cluster by in Select 语句使用相同的原则在 reducer 之间分配行,因为它的主要用途是用数据填充分桶表。

我创建了一个包含 1 个整数列“a”的表,并在其中插入了 0 到 9 的数字。

然后我将减速器的数量设置为 2 set mapred.reduce.tasks = 2;

select这个表中的数据 withCluster by子句 select * from my_tab cluster by a;

并收到了我预期的结果:

0
2
4
6
8
1
3
5
7
9
Run Code Online (Sandbox Code Playgroud)

所以,第一个减速器(数字 0)得到偶数(因为它们的模式 2 给出了 0)

和第二个减速器(编号 1)得到奇数(因为它们的模式 2 给出了 1)

这就是“分发方式”的工作原理。

然后“排序依据”对每个减速器内的结果进行排序。