NightPxy 个人技术博客

Spark-DAG和调度执行

Posted on By NightPxy

DAG 概述

DAG 的本意是有向无环图,在Spark中DAG 特指的是Spark作业执行的计划
Spark中执行的本质是分组的并且组之间有依赖的func

  • func 是Spark的算子(无论是Action还是Transform算子的本质都是一个func)
  • func组的概念是func按照Stage进行分组
  • func组之内的是一条有向串行线,func组之间则可能出现一对多或多对多的有向依赖关系

这样func之间就呈现出有向无环图的特征

  • 有向是指算子func和Stage的执行流向
  • 无环是指流向执行时不会出现倒退死循环

DAG 的绘制

  • DAG绘制的展开是由RDD的Action算子的触发(sc.rubJob)
    这也是为什么Transform是Lazy的,必须由Action触发才能执行的原因
    因为只有Action才会触发DAG绘制并交予调度执行
  • 绘制DAG的过程首先是收集触发绘制的Action算子,包装成为一个Job(一次作业执行的最小单元)
    Job是一次作业执行的最小单元,一个Action算子触发一个Job
    Job会生成一个默认Stage(Stage是调度执行的最小单元,至少会生成一个默认Stage)
    默认Stage为FinalStage (FinalStage是一个ResultStage,待会说)
  • 绘制FinalStage细节
    绘制FinalStage从触发的Action开始,向上溯源遍历(RDD依赖链,算子在RDD中)
    溯源遍历过程中,使用一个ArrayStack暂存算子(方便倒装,Action溯源是一种逆序溯源)
    每个溯源都会判断是否有shuffle(判断标准是RDD的Dependency是否是ShuffleDependency)
    如果没有shuffle就会将当前算子加入当前Stage,然后继续溯源(窄依赖)
    如果是shuffle就新建Stage(ShuffleStage),作为当前Stage的依赖Stage,然后在新Stage中继续溯源(宽依赖产生独立Stage)
    如果溯源过程判断某一个RDD是已经溯源过的就会就此停止对这个RDD溯源(无环保证)
  • 溯源完毕后的结果是StageGroup,就是DAG

DAG 的执行

,接下来说下这个StageGroup如何调度执行的

调度执行架构

Spark中调度执行是一个类似YARN的主从调度架构,其核心类TaskScheduler
在Spark启动后,会在driverexecutor分别启动两个不同功能的线程

  • driver中启动TaskScheduleBackEnd
    接收executor注册与心跳,维护可用executor元数据
  • executor启动时同时启动ScheduleExecutorBackEnd线程
    通过RPC向driver注册并维持心跳
    等待接收driver的任务执行指令

DAG 解析

DAG绘制的结构是 StageGroup,但在调度执行层面是按照Stage为单位进行执行的

  • DAG的执行开始是在DAGScheduler.submitMissStage
    StageGroup可能包含多个Stage,但执行是串行的(执行完毕一个Stage,再执行另一Stage,在源码中就是如果检测到当前MissStage不为空,就会自旋等待)
    这也是窄依赖比宽依赖好的地方之一,窄依赖内部是完全并行执行的,宽依赖则不得不等待上一个Stage执行完毕才能开始下一步
  • 解析Stage
    DAGScheduler.submitMissStage 解析Stage,作为后续任务执行的核心
    执行过程 RDD的算子链序列化(就是RDD的遍历函数func的反射执行位置)
    分区数 解析Stage的分区数(RDD分区数),作为后续Task数量(一个分区一个Task)
    数据所在地 数据源所在地或者缓存所在地,作为后续调度Task本地化的参考
  • 构建TaskSet
    将Stage的执行过程,分区数,数据所在地等核心包装为分区数个Task
    如果是ResultStage 就会包装成 ResultTask
    如果是ShuffleStage 就会包装成 ShuffleTask
    最终Stage会被解析成分区数个Task组成的TaskSet
  • 调度TaskSet(TaskScheduler.submitTask)
    遍历TaskSet的每一个Task,调度执行
    executor 首先会从driver中拿到当前保持心跳存活的executor列表
    黑名单过滤 executor列表中过滤掉处于黑名单中的executor
    本地化过滤 executor列表按照Task所在地根据本地化策略排序(executor,node,集群..)
    顺序尝试 按照本地化排序后的executor通过RPC逐个尝试(TaskScheduler.lauchTask)(扣核能否成功,成功则就发给该executor执行,失败就尝试下一个.注意这里的是超时自旋,所以本地化等待时间是一个非常重要的优化点)
    最终Task会与一个Executor完成绑定过程(TaskDescription),这样最终完成调度过程
  • 发送执行
    因为TaskDescription最终已经完成了Task的调度过程,发送执行则非常简单了
    向目标executor通过RPC发送TaskDescription,要求executor执行
  • executor接收任务执行
    executor通过RPC接收到任务TaskDescription,会还原任务并提交给自己的线程池执行
    在任务执行完毕后根据Task类型决定后续如何进行
    ResultTask执行完毕后,executor会发回执行结果
    ShuffleTask执行完毕后,executor会将结果存储在自身BlockManager中,将BlockId发回driver