DAG 概述
DAG 的本意是有向无环图,在Spark中DAG 特指的是Spark作业执行的计划
Spark中执行的本质是分组的并且组之间有依赖的func
- func 是Spark的算子(无论是Action还是Transform算子的本质都是一个func)
- func组的概念是func按照Stage进行分组
- func组之内的是一条有向串行线,func组之间则可能出现一对多或多对多的有向依赖关系
这样func之间就呈现出有向无环图的特征
- 有向是指算子func和Stage的执行流向
- 无环是指流向执行时不会出现倒退死循环
DAG 的绘制
- DAG绘制的展开是由RDD的Action算子的触发(
sc.rubJob
)
这也是为什么Transform是Lazy的,必须由Action触发才能执行的原因
因为只有Action才会触发DAG绘制并交予调度执行 - 绘制DAG的过程首先是收集触发绘制的Action算子,包装成为一个Job(一次作业执行的最小单元)
Job
是一次作业执行的最小单元,一个Action算子触发一个Job
Job
会生成一个默认Stage(Stage是调度执行的最小单元,至少会生成一个默认Stage)
默认Stage为FinalStage
(FinalStage
是一个ResultStage
,待会说) - 绘制
FinalStage
细节
绘制FinalStage
从触发的Action开始,向上溯源遍历(RDD依赖链,算子在RDD中)
溯源遍历过程中,使用一个ArrayStack
暂存算子(方便倒装,Action溯源是一种逆序溯源)
每个溯源都会判断是否有shuffle(判断标准是RDD的Dependency是否是ShuffleDependency)
如果没有shuffle就会将当前算子加入当前Stage,然后继续溯源(窄依赖)
如果是shuffle就新建Stage(ShuffleStage
),作为当前Stage的依赖Stage,然后在新Stage中继续溯源(宽依赖产生独立Stage)
如果溯源过程判断某一个RDD是已经溯源过的就会就此停止对这个RDD溯源(无环保证) - 溯源完毕后的结果是
StageGroup
,就是DAG
DAG 的执行
,接下来说下这个StageGroup如何调度执行的
调度执行架构
Spark中调度执行是一个类似YARN
的主从调度架构,其核心类TaskScheduler
在Spark启动后,会在driver
和 executor
分别启动两个不同功能的线程
- driver中启动
TaskScheduleBackEnd
接收executor注册与心跳,维护可用executor元数据 - executor启动时同时启动
ScheduleExecutorBackEnd
线程
通过RPC向driver注册并维持心跳
等待接收driver的任务执行指令
DAG 解析
DAG绘制的结构是 StageGroup
,但在调度执行层面是按照Stage为单位进行执行的
- DAG的执行开始是在
DAGScheduler.submitMissStage
中
StageGroup可能包含多个Stage,但执行是串行的(执行完毕一个Stage,再执行另一Stage,在源码中就是如果检测到当前MissStage不为空,就会自旋等待)
这也是窄依赖比宽依赖好的地方之一,窄依赖内部是完全并行执行的,宽依赖则不得不等待上一个Stage执行完毕才能开始下一步 - 解析Stage
DAGScheduler.submitMissStage
解析Stage,作为后续任务执行的核心
执行过程 RDD的算子链序列化(就是RDD的遍历函数func的反射执行位置)
分区数 解析Stage的分区数(RDD分区数),作为后续Task数量(一个分区一个Task)
数据所在地 数据源所在地或者缓存所在地,作为后续调度Task本地化的参考 - 构建TaskSet
将Stage的执行过程
,分区数
,数据所在地
等核心包装为分区数
个Task
如果是ResultStage
就会包装成ResultTask
如果是ShuffleStage
就会包装成ShuffleTask
最终Stage会被解析成分区数
个Task组成的TaskSet - 调度TaskSet(
TaskScheduler.submitTask
)
遍历TaskSet的每一个Task,调度执行
executor 首先会从driver中拿到当前保持心跳存活的executor列表
黑名单过滤 executor列表中过滤掉处于黑名单中的executor
本地化过滤 executor列表按照Task所在地
根据本地化策略排序(executor,node,集群..)
顺序尝试 按照本地化排序后的executor通过RPC逐个尝试(TaskScheduler.lauchTask
)(扣核能否成功,成功则就发给该executor执行,失败就尝试下一个.注意这里的是超时自旋,所以本地化等待时间是一个非常重要的优化点)
最终Task会与一个Executor完成绑定过程(TaskDescription
),这样最终完成调度过程 - 发送执行
因为TaskDescription
最终已经完成了Task的调度过程,发送执行则非常简单了
向目标executor通过RPC发送TaskDescription
,要求executor执行 - executor接收任务执行
executor通过RPC接收到任务TaskDescription
,会还原任务并提交给自己的线程池执行
在任务执行完毕后根据Task类型决定后续如何进行
ResultTask
执行完毕后,executor会发回执行结果
ShuffleTask
执行完毕后,executor会将结果存储在自身BlockManager
中,将BlockId
发回driver