NightPxy 个人技术博客

Hive 执行计划

Posted on By NightPxy

No MapReduce

HiveSQL不是每种情况都会跑MapReduce的.
基本查询,或者说是不涉及计算(比如查询分区表)的查询,是不会启动MapReduce任务的

explain select * from dept_et limit 1;

STAGE DEPENDENCIES:
  Stage-0 is a root stage

STAGE PLANS:
  Stage: Stage-0
    Fetch Operator
      limit: 1
      Processor Tree:
        TableScan
          alias: dept_et
          Statistics: Num rows: 1 Data size: 322 Basic stats: COMPLETE Column stats: NONE
          Select Operator
            expressions: id (type: int), name (type: string), city (type: string)
            outputColumnNames: _col0, _col1, _col2
            Statistics: Num rows: 1 Data size: 322 Basic stats: COMPLETE Column stats: NONE
            Limit
              Number of rows: 1
              Statistics: Num rows: 1 Data size: 322 Basic stats: COMPLETE Column stats: NONE
              ListSink

Join

shuffle-join

shuffle-join概述

或者说 common-join / shuffle-join / reduce-join 一个意思
shuffle-join,顾名思义就是会有shuffle参与join.(详见hadoop-MapReduce-详解)

shuffle-join核心思路

依赖shuffle的同Key必然同分区的原理,将表的连接键作为Key然后进行shuffle,将相同Key(连接键相同)的数据混洗到一起,然后在reduce端完成join操作

shuffle-join适用前提

shuffle-join是一个比较普遍意义的join.它可以将任意两张表(相对的无视大小)进行join
比较适用于两张大(单表内存装不下)表但是又不能太大的的情况
如果一大一小,因为shuffle的昂贵特性,采用map-join的效率更高
如果是两张超大表,shuffle过程的成本太高,或者大到内存连Key都放不下导致根本join不出来.这时应采用 sort-merge-join

shuffle-join完整过程

  1. 两表分别跑map,map的输出就是键值对 (连接键,查询列裁剪后需要的列值)
  2. MapReduce本身的shuffle,会将map输出的相同的Key分到一起.
  3. 执行reduce端,因为相同的Key在一起,所以直接取出合并相同Key的值,作为reduce端的输出
    注意这里同一个Key的值可能来自不同节点不同分区的.
  4. 最后reduce端输出的结果,就是连接的结果

shuffle-join执行计划演示

--关闭Hive的join自动转换优化(否则数据太少,不能演示出shuffle-join)
hive> set hive.auto.convert.join=false;
hive> explain select d.deptno,d.dname,e.ename from emp e join dept d on d.deptno = e.deptno;

OK
--构建作业流 1 -> 0
STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-0 depends on stages: Stage-1

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Map Operator Tree:
          --表扫描 emp 表
          --shuffle-join 是按照join顺序从前往后读(交换join可以改变这个读取顺序)
          --所以join的优化点可以是尽量小表放前大表放后(相对大小)
          TableScan
            alias: e
            Statistics: Num rows: 6 Data size: 700 Basic stats: COMPLETE Column stats: NONE
            --join的隐藏条件:连接键不能为null
            Filter Operator
              predicate: deptno is not null (type: boolean)
              Statistics: Num rows: 3 Data size: 350 Basic stats: COMPLETE Column stats: NONE
              --shuffle-join 表No.1的map输出(deptno,ename)
              Reduce Output Operator
                key expressions: deptno (type: int)
                sort order: +
                Map-reduce partition columns: deptno (type: int)
                Statistics: Num rows: 3 Data size: 350 Basic stats: COMPLETE Column stats: NONE
                value expressions: ename (type: string)
          --表扫描 emp 表
          TableScan
            alias: d
            Statistics: Num rows: 1 Data size: 79 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              --同样的join的隐藏条件:连接键不能为null
              predicate: deptno is not null (type: boolean)
              Statistics: Num rows: 1 Data size: 79 Basic stats: COMPLETE Column stats: NONE
              --表No.2的map输出(deptno,dname)
              Reduce Output Operator
                key expressions: deptno (type: int)
                sort order: +
                Map-reduce partition columns: deptno (type: int)
                Statistics: Num rows: 1 Data size: 79 Basic stats: COMPLETE Column stats: NONE
                value expressions: dname (type: string)
      --两表的map完毕,开始reduce
      Reduce Operator Tree:
        --reduce操作定义.这里的reduce操作就是join操作
        --注意这里:shuffle-join的join是发生在reduce阶段的
        Join Operator
          condition map:
               --join的条件 0(表No.1 deptno) to 1((表No.2 deptno))
               Inner Join 0 to 1
          keys:
            0 deptno (type: int)
            1 deptno (type: int)
          --reduce的输出,就是select子句中 与No.1,No.2表相关的查询列(d.deptno,d.dname,e.ename)
          outputColumnNames: _col1, _col11, _col12
          Statistics: Num rows: 3 Data size: 385 Basic stats: COMPLETE Column stats: NONE
          --join操作已经完毕了,处理select字句(这里关联只有两张表,没后续了)
          Select Operator
            expressions: _col11 (type: int), _col12 (type: string), _col1 (type: string)
            outputColumnNames: _col0, _col1, _col2
            Statistics: Num rows: 3 Data size: 385 Basic stats: COMPLETE Column stats: NONE
            File Output Operator
              -- compressed: false 输出不压缩
              -- input format 文本方式读取
              -- output format 输出不包含Key.(join,key只是中间数据,只输出Value即可)
              -- serde 序列化/反序列化 LazySimpleSerDe内置处理TEXTFILE的序列化器
              compressed: false
              Statistics: Num rows: 3 Data size: 385 Basic stats: COMPLETE Column stats: NONE
              table:
                  input format: org.apache.hadoop.mapred.TextInputFormat
                  output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
  --内置的退出步骤-0
  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink

Time taken: 0.78 seconds, Fetched: 59 row(s)

map-join

map-join概述

map-join在Spark中叫broadcase-join
map-join是join中的理想情况,因为map-join避开了昂贵的shuffle过程,直接在map阶段就完成join

map-join核心思路

map-join的核心思路是将join的某张表(小表),以hashtable的形式交给另一张表的map阶段,从而在map阶段直接以hash-join的形式完成join.

map-join适用的前提

从核心思路上可以看出,map-join的前提是必须存在小表.

注意: 这里的小表是一个绝对概念,它不是A表比B表小很多就叫小表.而是这个表必须小到 内存中完整放下该表的HashTable

map-join相关配置参数

配置节 描述
hive.mapjoin.smalltable.filesize 小表文件最大字节数 默认 25000000 大概是23.84M
hive.ignore.mapjoin.hint 默认true 是否忽略mapjoin hint 即mapjoin标记
hive.auto.convert.join.noconditionaltask 默认true 将普通的join转化为普通的mapjoin时,是否将多个mapjoin转化为一个mapjoin
hive.auto.convert.join.noconditionaltask.size 将多个mapjoin转化为一个mapjoin时,其表的最大值 默认10000000

map-join的完整过程

  1. join时,hive通过MetaStore检测到存在小表
  2. 小表数据构建HashTable,连接键作为Key,Value为列裁剪后小表需要输出的字段值
  3. HashTable上传至节点可访问的分布式存储中
    这里一般是分布式缓存存储,但HDFS也可以
    重要的是在任意节点(YARN随机抽取的某一个大表MR执行节点)必须能拉取到这个HashTable
  4. 大表拉取HashTable放入内存,map阶段便检查大表中的连接键在HashTable里是否存在,如果存在就记下来作为map输出,不存在就跳过
  5. 最终大表map完成的输出就是join的结果
    注意,这里没有shuffle过程,也没有reduce过程

map-join执行计划演示


hive> explain select d.deptno,d.dname,e.ename from emp e join dept d on d.deptno = e.deptno;
OK

-- 构建MR作业流 4(map小表) -> 3(map大表) -> 0 (内置-退出)
STAGE DEPENDENCIES:
  Stage-4 is a root stage
  Stage-3 depends on stages: Stage-4
  Stage-0 depends on stages: Stage-3

STAGE PLANS:
  --作业No.1 map小表
  Stage: Stage-4
    --因为dept表数据量非常小(79字节) 所以Hive自动采用 map-join
    Map Reduce Local Work
      Alias -> Map Local Tables:
    --注意这里的小表选取是dept表(无论怎么交换位置,比如现在放在最后)
    --原因是dept表(79字节)比emp表(700)小,构建dept表作为HashTable的成本更小
        d 
          Fetch Operator
            limit: -1
      Alias -> Map Local Operator Tree:
        d 
          --表扫描dept,构建HashTable
          TableScan
            alias: d
            Statistics: Num rows: 1 Data size: 79 Basic stats: COMPLETE Column stats: NONE
            --扫描条件 
            --注意这里的deptno is not null,这是join的隐含条件,连接键不能为null
            Filter Operator
              predicate: deptno is not null (type: boolean)
              Statistics: Num rows: 1 Data size: 79 Basic stats: COMPLETE Column stats: NONE
              --构建HashTable操作,键就是连接键deptno
              --有两个是保存结构方便后面处理,1 deptno 其实是给后面大表用的
              HashTable Sink Operator
                keys:
                  0 deptno (type: int)
                  1 deptno (type: int)
  --作业No.2 扫描大表
  Stage: Stage-3
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: e
            Statistics: Num rows: 6 Data size: 700 Basic stats: COMPLETE Column stats: NONE
            --扫描条件,join始终有隐含条件,连接键不能为null
            Filter Operator
              predicate: deptno is not null (type: boolean)
              Statistics: Num rows: 3 Data size: 350 Basic stats: COMPLETE Column stats: NONE
              --直接在内存中做hash-join 条件是 0(deptno) to 1(deptno)
              Map Join Operator
                condition map:
                     Inner Join 0 to 1
                keys:
                  0 deptno (type: int)
                  1 deptno (type: int)
                -- 输出字段 就是select字句中的内容 
                outputColumnNames: _col1, _col11, _col12
                Statistics: Num rows: 3 Data size: 385 Basic stats: COMPLETE Column stats: NONE
                Select Operator
                  expressions: _col11 (type: int), _col12 (type: string), _col1 (type: string)
                  outputColumnNames: _col0, _col1, _col2
                  Statistics: Num rows: 3 Data size: 385 Basic stats: COMPLETE Column stats: NONE
                 -- 输出的方式文件输出
                 -- MR的形式就是文件输出,哪怕控制台的一个查询,也会输出一个临时文件
                  File Output Operator
                   TEXTFILE
                  -- 输出为 TEXTFILE
                  -- compressed: false 输出不压缩
                  -- input format 文本方式读取
                  -- output format 输出不包含Key.(join,key只是中间数据,只输出Value即可)
                  -- serde 序列化/反序列化 LazySimpleSerDe内置处理TEXTFILE的序列化器
                    compressed: false
                    Statistics: Num rows: 3 Data size: 385 Basic stats: COMPLETE Column stats: NONE
                    
                    table:
                        input format: org.apache.hadoop.mapred.TextInputFormat
                        output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
      Local Work:
        Map Reduce Local Work
  --内置的退出步骤-0
  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink

Time taken: 0.071 seconds, Fetched: 62 row(s)

sort-merge-join

sort-merge-join概述

sort-merge-join在Hive实现以sort-merge-buket-join

sort-merge-join核心思路

sort-merge-join 对两张表按照连接键排序输出为两个临时a,b序列 再对这两个已排序的序列从头遍历,如果遇见Key相同的就输出,如果不同,就比较两边当前的Key大小,左边小就继续取左边,右边小就继续取右边

sort-merge-join适用前提

从核心思路上可以看出,sort-merge-join是即取即丢的.也就是它无需将任一侧的表都完整读取.所以特别适合两张超大的表进行join

sort-merge-join完整过程

  1. join时,hive通过MetaStore检测到两张表都是超大表
  2. 对两张表按照连接键进行排序(从小到大),输出为两个临时a,b排序文件
  3. 排序输出完毕后,对a,b两个排序左右分别从头遍历.遍历规则为: 如果遇见相同Key,就表示是匹配join进行输出(到临时文件) 如果遇见不相同的Key,就比较左右两边当前Key的大小,如果左边小就继续取左边,右边取右边
  4. 读取完毕时,遍历过程中的全部输出就是join的结果

group by

explain select city,sum(id) from dept_et group by city;

STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-0 depends on stages: Stage-1

STAGE PLANS:
  <!--stage定义,一个stage对应一个MapReduce-->
  Stage: Stage-1
    <!--Map过程-->
    Map Reduce
      Map Operator Tree:
          TableScan //表扫描
            alias: dept_et
            Statistics: Num rows: 3 Data size: 322 Basic stats: COMPLETE Column stats: NONE //dept_et的统计数据预估
            Select Operator //查询列裁剪,表示只需要 city (type: string), id (type: int) 两列
              expressions: city (type: string), id (type: int)
              outputColumnNames: city, id
              Statistics: Num rows: 3 Data size: 322 Basic stats: COMPLETE Column stats: NONE
              <!--map操作定义
                    是以city (type: string)hash作为key,执行函数sum(id),结果为_col0, _col1(hash(city),sum(id))-->
              Group By Operator 
                aggregations: sum(id) //分组执行函数=>sum(id)
                keys: city (type: string) 
                mode: hash 
                outputColumnNames: _col0, _col1 
                Statistics: Num rows: 3 Data size: 322 Basic stats: COMPLETE Column stats: NONE
                <!--map端的输出-->
                Reduce Output Operator 
                  key expressions: _col0 (type: string) //Map端输出的Key_col0(hash(city))
                  sort order: +
                  Map-reduce partition columns: _col0 (type: string)
                  Statistics: Num rows: 3 Data size: 322 Basic stats: COMPLETE Column stats: NONE
                  value expressions: _col1 (type: bigint) //Map端输出的Value_col1(sum(id))
      <!--Reduce过程 合并多个Map的输出
            _col0(也就是map输出的hash(city))key 执行sum(VALUE._col0(也就是map输出的sum(id))),执行结果也是_col0, _col1(hash(city),sum(sum(id)))-->
      Reduce Operator Tree:
        Group By Operator
          aggregations: sum(VALUE._col0
          keys: KEY._col0 (type: string)
          mode: mergepartial //partial(多个map的输出)merge(合并)
          outputColumnNames: _col0, _col1
          Statistics: Num rows: 1 Data size: 107 Basic stats: COMPLETE Column stats: NONE
          <!--Reduce端的输出 输出为一个临时文件,不压缩-->
          File Output Operator
            compressed: false
            Statistics: Num rows: 1 Data size: 107 Basic stats: COMPLETE Column stats: NONE
            table:
                input format: org.apache.hadoop.mapred.TextInputFormat
                output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink

distinct

distinct一个

select city,count(distinct(name)) from dept_et group by city; 只有一个distinct,将group字段和distinct字段一起组合为Map的输出Key,然后把group字段作为Reduce的Key,在Reduce阶段保存LastKey 

STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-0 depends on stages: Stage-1

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      <!--Map端定义
            输入: 表扫描 dept_et 原值查询city,name
            执行过程: group(city),distinct(name)做为Key,执行表达式count(DISTINCT name)
            输出:_col0, _col1, _col2 (city,name,count(DISTINCT name))-->
      Map Operator Tree:
          TableScan
            alias: dept_et
            Statistics: Num rows: 1 Data size: 322 Basic stats: COMPLETE Column stats: NONE
            Select Operator
              expressions: city (type: string), name (type: string) //没有计算函数,直接是查询原值
              outputColumnNames: city, name
              Statistics: Num rows: 1 Data size: 322 Basic stats: COMPLETE Column stats: NONE
              Group By Operator
                aggregations: count(DISTINCT name)
                keys: city (type: string), name (type: string)
                mode: hash
                outputColumnNames: _col0, _col1, _col2
                Statistics: Num rows: 1 Data size: 322 Basic stats: COMPLETE Column stats: NONE
                Reduce Output Operator
                  key expressions: _col0 (type: string), _col1 (type: string)
                  sort order: ++
                  Map-reduce partition columns: _col0 (type: string)
                  Statistics: Num rows: 1 Data size: 322 Basic stats: COMPLETE Column stats: NONE
      <!--Reduce端定义
            接收Map端的输出,再以_col0作为Key,再做一次聚合(city.name做一次去重计数) 结果输出到临时文件-->
      Reduce Operator Tree:
        Group By Operator
          aggregations: count(DISTINCT KEY._col1:0._col0)
          keys: KEY._col0 (type: string)
          mode: mergepartial
          outputColumnNames: _col0, _col1
          Statistics: Num rows: 1 Data size: 322 Basic stats: COMPLETE Column stats: NONE
          File Output Operator
            compressed: false
            Statistics: Num rows: 1 Data size: 322 Basic stats: COMPLETE Column stats: NONE
            table:
                input format: org.apache.hadoop.mapred.TextInputFormat
                output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink