No MapReduce
HiveSQL不是每种情况都会跑MapReduce的.
基本查询,或者说是不涉及计算(比如查询分区表)的查询,是不会启动MapReduce任务的
explain select * from dept_et limit 1;
STAGE DEPENDENCIES:
Stage-0 is a root stage
STAGE PLANS:
Stage: Stage-0
Fetch Operator
limit: 1
Processor Tree:
TableScan
alias: dept_et
Statistics: Num rows: 1 Data size: 322 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: id (type: int), name (type: string), city (type: string)
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 1 Data size: 322 Basic stats: COMPLETE Column stats: NONE
Limit
Number of rows: 1
Statistics: Num rows: 1 Data size: 322 Basic stats: COMPLETE Column stats: NONE
ListSink
Join
shuffle-join
shuffle-join概述
或者说 common-join / shuffle-join / reduce-join 一个意思
shuffle-join,顾名思义就是会有shuffle参与join.(详见hadoop-MapReduce-详解)
shuffle-join核心思路
依赖shuffle的同Key必然同分区的原理,将表的连接键作为Key然后进行shuffle,将相同Key(连接键相同)的数据混洗到一起,然后在reduce端完成join操作
shuffle-join适用前提
shuffle-join是一个比较普遍意义的join.它可以将任意两张表(相对的无视大小)进行join
比较适用于两张大(单表内存装不下)表但是又不能太大的的情况
如果一大一小,因为shuffle的昂贵特性,采用map-join的效率更高
如果是两张超大表,shuffle过程的成本太高,或者大到内存连Key都放不下导致根本join不出来.这时应采用 sort-merge-join
shuffle-join完整过程
- 两表分别跑map,map的输出就是键值对 (连接键,查询列裁剪后需要的列值)
- MapReduce本身的shuffle,会将map输出的相同的Key分到一起.
- 执行reduce端,因为相同的Key在一起,所以直接取出合并相同Key的值,作为reduce端的输出
注意这里同一个Key的值可能来自不同节点不同分区的. - 最后reduce端输出的结果,就是连接的结果
shuffle-join执行计划演示
--关闭Hive的join自动转换优化(否则数据太少,不能演示出shuffle-join)
hive> set hive.auto.convert.join=false;
hive> explain select d.deptno,d.dname,e.ename from emp e join dept d on d.deptno = e.deptno;
OK
--构建作业流 1 -> 0
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1
STAGE PLANS:
Stage: Stage-1
Map Reduce
Map Operator Tree:
--表扫描 emp 表
--shuffle-join 是按照join顺序从前往后读(交换join可以改变这个读取顺序)
--所以join的优化点可以是尽量小表放前大表放后(相对大小)
TableScan
alias: e
Statistics: Num rows: 6 Data size: 700 Basic stats: COMPLETE Column stats: NONE
--join的隐藏条件:连接键不能为null
Filter Operator
predicate: deptno is not null (type: boolean)
Statistics: Num rows: 3 Data size: 350 Basic stats: COMPLETE Column stats: NONE
--shuffle-join 表No.1的map输出(deptno,ename)
Reduce Output Operator
key expressions: deptno (type: int)
sort order: +
Map-reduce partition columns: deptno (type: int)
Statistics: Num rows: 3 Data size: 350 Basic stats: COMPLETE Column stats: NONE
value expressions: ename (type: string)
--表扫描 emp 表
TableScan
alias: d
Statistics: Num rows: 1 Data size: 79 Basic stats: COMPLETE Column stats: NONE
Filter Operator
--同样的join的隐藏条件:连接键不能为null
predicate: deptno is not null (type: boolean)
Statistics: Num rows: 1 Data size: 79 Basic stats: COMPLETE Column stats: NONE
--表No.2的map输出(deptno,dname)
Reduce Output Operator
key expressions: deptno (type: int)
sort order: +
Map-reduce partition columns: deptno (type: int)
Statistics: Num rows: 1 Data size: 79 Basic stats: COMPLETE Column stats: NONE
value expressions: dname (type: string)
--两表的map完毕,开始reduce
Reduce Operator Tree:
--reduce操作定义.这里的reduce操作就是join操作
--注意这里:shuffle-join的join是发生在reduce阶段的
Join Operator
condition map:
--join的条件 0(表No.1 deptno) to 1((表No.2 deptno))
Inner Join 0 to 1
keys:
0 deptno (type: int)
1 deptno (type: int)
--reduce的输出,就是select子句中 与No.1,No.2表相关的查询列(d.deptno,d.dname,e.ename)
outputColumnNames: _col1, _col11, _col12
Statistics: Num rows: 3 Data size: 385 Basic stats: COMPLETE Column stats: NONE
--join操作已经完毕了,处理select字句(这里关联只有两张表,没后续了)
Select Operator
expressions: _col11 (type: int), _col12 (type: string), _col1 (type: string)
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 3 Data size: 385 Basic stats: COMPLETE Column stats: NONE
File Output Operator
-- compressed: false 输出不压缩
-- input format 文本方式读取
-- output format 输出不包含Key.(join,key只是中间数据,只输出Value即可)
-- serde 序列化/反序列化 LazySimpleSerDe内置处理TEXTFILE的序列化器
compressed: false
Statistics: Num rows: 3 Data size: 385 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
--内置的退出步骤-0
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink
Time taken: 0.78 seconds, Fetched: 59 row(s)
map-join
map-join概述
map-join在Spark中叫broadcase-join
map-join是join中的理想情况,因为map-join避开了昂贵的shuffle过程,直接在map阶段就完成join
map-join核心思路
map-join的核心思路是将join的某张表(小表),以hashtable的形式交给另一张表的map阶段,从而在map阶段直接以hash-join的形式完成join.
map-join适用的前提
从核心思路上可以看出,map-join的前提是必须存在小表.
注意: 这里的小表是一个绝对概念,它不是A表比B表小很多就叫小表.而是这个表必须小到 内存中完整放下该表的HashTable
map-join相关配置参数
配置节 | 描述 |
---|---|
hive.mapjoin.smalltable.filesize | 小表文件最大字节数 默认 25000000 大概是23.84M |
hive.ignore.mapjoin.hint | 默认true 是否忽略mapjoin hint 即mapjoin标记 |
hive.auto.convert.join.noconditionaltask | 默认true 将普通的join转化为普通的mapjoin时,是否将多个mapjoin转化为一个mapjoin |
hive.auto.convert.join.noconditionaltask.size | 将多个mapjoin转化为一个mapjoin时,其表的最大值 默认10000000 |
map-join的完整过程
- join时,hive通过MetaStore检测到存在小表
- 小表数据构建HashTable,连接键作为Key,Value为列裁剪后小表需要输出的字段值
- HashTable上传至节点可访问的分布式存储中
这里一般是分布式缓存存储,但HDFS也可以
重要的是在任意节点(YARN随机抽取的某一个大表MR执行节点)必须能拉取到这个HashTable - 大表拉取HashTable放入内存,map阶段便检查大表中的连接键在HashTable里是否存在,如果存在就记下来作为map输出,不存在就跳过
- 最终大表map完成的输出就是join的结果
注意,这里没有shuffle过程,也没有reduce过程
map-join执行计划演示
hive> explain select d.deptno,d.dname,e.ename from emp e join dept d on d.deptno = e.deptno;
OK
-- 构建MR作业流 4(map小表) -> 3(map大表) -> 0 (内置-退出)
STAGE DEPENDENCIES:
Stage-4 is a root stage
Stage-3 depends on stages: Stage-4
Stage-0 depends on stages: Stage-3
STAGE PLANS:
--作业No.1 map小表
Stage: Stage-4
--因为dept表数据量非常小(79字节) 所以Hive自动采用 map-join
Map Reduce Local Work
Alias -> Map Local Tables:
--注意这里的小表选取是dept表(无论怎么交换位置,比如现在放在最后)
--原因是dept表(79字节)比emp表(700)小,构建dept表作为HashTable的成本更小
d
Fetch Operator
limit: -1
Alias -> Map Local Operator Tree:
d
--表扫描dept,构建HashTable
TableScan
alias: d
Statistics: Num rows: 1 Data size: 79 Basic stats: COMPLETE Column stats: NONE
--扫描条件
--注意这里的deptno is not null,这是join的隐含条件,连接键不能为null
Filter Operator
predicate: deptno is not null (type: boolean)
Statistics: Num rows: 1 Data size: 79 Basic stats: COMPLETE Column stats: NONE
--构建HashTable操作,键就是连接键deptno
--有两个是保存结构方便后面处理,1 deptno 其实是给后面大表用的
HashTable Sink Operator
keys:
0 deptno (type: int)
1 deptno (type: int)
--作业No.2 扫描大表
Stage: Stage-3
Map Reduce
Map Operator Tree:
TableScan
alias: e
Statistics: Num rows: 6 Data size: 700 Basic stats: COMPLETE Column stats: NONE
--扫描条件,join始终有隐含条件,连接键不能为null
Filter Operator
predicate: deptno is not null (type: boolean)
Statistics: Num rows: 3 Data size: 350 Basic stats: COMPLETE Column stats: NONE
--直接在内存中做hash-join 条件是 0(deptno) to 1(deptno)
Map Join Operator
condition map:
Inner Join 0 to 1
keys:
0 deptno (type: int)
1 deptno (type: int)
-- 输出字段 就是select字句中的内容
outputColumnNames: _col1, _col11, _col12
Statistics: Num rows: 3 Data size: 385 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: _col11 (type: int), _col12 (type: string), _col1 (type: string)
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 3 Data size: 385 Basic stats: COMPLETE Column stats: NONE
-- 输出的方式文件输出
-- MR的形式就是文件输出,哪怕控制台的一个查询,也会输出一个临时文件
File Output Operator
TEXTFILE
-- 输出为 TEXTFILE
-- compressed: false 输出不压缩
-- input format 文本方式读取
-- output format 输出不包含Key.(join,key只是中间数据,只输出Value即可)
-- serde 序列化/反序列化 LazySimpleSerDe内置处理TEXTFILE的序列化器
compressed: false
Statistics: Num rows: 3 Data size: 385 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Local Work:
Map Reduce Local Work
--内置的退出步骤-0
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink
Time taken: 0.071 seconds, Fetched: 62 row(s)
sort-merge-join
sort-merge-join概述
sort-merge-join在Hive实现以sort-merge-buket-join
sort-merge-join核心思路
sort-merge-join 对两张表按照连接键排序输出为两个临时a,b序列 再对这两个已排序的序列从头遍历,如果遇见Key相同的就输出,如果不同,就比较两边当前的Key大小,左边小就继续取左边,右边小就继续取右边
sort-merge-join适用前提
从核心思路上可以看出,sort-merge-join是即取即丢的.也就是它无需将任一侧的表都完整读取.所以特别适合两张超大的表进行join
sort-merge-join完整过程
- join时,hive通过MetaStore检测到两张表都是超大表
- 对两张表按照连接键进行排序(从小到大),输出为两个临时a,b排序文件
- 排序输出完毕后,对a,b两个排序左右分别从头遍历.遍历规则为: 如果遇见相同Key,就表示是匹配join进行输出(到临时文件) 如果遇见不相同的Key,就比较左右两边当前Key的大小,如果左边小就继续取左边,右边取右边
- 读取完毕时,遍历过程中的全部输出就是join的结果
group by
explain select city,sum(id) from dept_et group by city;
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1
STAGE PLANS:
<!--stage定义,一个stage对应一个MapReduce-->
Stage: Stage-1
<!--Map过程-->
Map Reduce
Map Operator Tree:
TableScan //表扫描
alias: dept_et
Statistics: Num rows: 3 Data size: 322 Basic stats: COMPLETE Column stats: NONE //表dept_et的统计数据预估
Select Operator //查询列裁剪,表示只需要 city (type: string), id (type: int) 两列
expressions: city (type: string), id (type: int)
outputColumnNames: city, id
Statistics: Num rows: 3 Data size: 322 Basic stats: COMPLETE Column stats: NONE
<!--map操作定义
是以city (type: string)取hash作为key,执行函数sum(id),结果为_col0, _col1(hash(city),sum(id))-->
Group By Operator
aggregations: sum(id) //分组执行函数=>sum(id)
keys: city (type: string)
mode: hash
outputColumnNames: _col0, _col1
Statistics: Num rows: 3 Data size: 322 Basic stats: COMPLETE Column stats: NONE
<!--map端的输出-->
Reduce Output Operator
key expressions: _col0 (type: string) //Map端输出的Key是_col0(hash(city))
sort order: +
Map-reduce partition columns: _col0 (type: string)
Statistics: Num rows: 3 Data size: 322 Basic stats: COMPLETE Column stats: NONE
value expressions: _col1 (type: bigint) //Map端输出的Value是_col1(sum(id))
<!--Reduce过程 合并多个Map的输出
以_col0(也就是map输出的hash(city))为key 执行sum(VALUE._col0(也就是map输出的sum(id))),执行结果也是_col0, _col1(hash(city),sum(sum(id)))-->
Reduce Operator Tree:
Group By Operator
aggregations: sum(VALUE._col0
keys: KEY._col0 (type: string)
mode: mergepartial //partial(多个map的输出)merge(合并)
outputColumnNames: _col0, _col1
Statistics: Num rows: 1 Data size: 107 Basic stats: COMPLETE Column stats: NONE
<!--Reduce端的输出 输出为一个临时文件,不压缩-->
File Output Operator
compressed: false
Statistics: Num rows: 1 Data size: 107 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink
distinct
distinct一个
select city,count(distinct(name)) from dept_et group by city; 只有一个distinct,将group字段和distinct字段一起组合为Map的输出Key,然后把group字段作为Reduce的Key,在Reduce阶段保存LastKey
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1
STAGE PLANS:
Stage: Stage-1
Map Reduce
<!--Map端定义
输入: 表扫描 dept_et 原值查询city,name
执行过程: 以group列(city),distinct列(name)做为Key,执行表达式count(DISTINCT name)
输出:_col0, _col1, _col2 (city,name,count(DISTINCT name))-->
Map Operator Tree:
TableScan
alias: dept_et
Statistics: Num rows: 1 Data size: 322 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: city (type: string), name (type: string) //没有计算函数,直接是查询原值
outputColumnNames: city, name
Statistics: Num rows: 1 Data size: 322 Basic stats: COMPLETE Column stats: NONE
Group By Operator
aggregations: count(DISTINCT name)
keys: city (type: string), name (type: string)
mode: hash
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 1 Data size: 322 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: _col0 (type: string), _col1 (type: string)
sort order: ++
Map-reduce partition columns: _col0 (type: string)
Statistics: Num rows: 1 Data size: 322 Basic stats: COMPLETE Column stats: NONE
<!--Reduce端定义
接收Map端的输出,再以_col0作为Key,再做一次聚合(对city.name做一次去重计数) 结果输出到临时文件-->
Reduce Operator Tree:
Group By Operator
aggregations: count(DISTINCT KEY._col1:0._col0)
keys: KEY._col0 (type: string)
mode: mergepartial
outputColumnNames: _col0, _col1
Statistics: Num rows: 1 Data size: 322 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 1 Data size: 322 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink