Yarn源码分析5-资源调度
《Hadoop技术内幕-深入解析YARN架构设计与实现原理》学习笔记。 (Yarn源码基于Hadoop 3.0.0)
Yarn源码小组讨论班
2018-06-26
1. 基本架构
资源调度器是最核心的组件之一,并且在yarn中是可插拔的,yarn中定义了一套接口规范,以方便用户实现自己的调度器,同时yarn中自带了FIFO,Capacity Sheduler, Fair Scheduler三种常用资源调度器。
1.1 调度模型
Yarn的资源分配过程中异步的,也就是说调度器将资源分配给一个应用后,不会立刻通知AM,而是等AM的心跳主动来取。整个资源的分配过程可以概括为以下的步骤:
- 1.NM通过心跳向RM汇报节点信息
- 2.RM按照一定策略将NM上的资源分配给应用(并没有直接分配,而是将结果放到一个内存数据结构)
- 3.应用AM向RM发心跳,以领取之前记录中在RM中的最新分配给自己的Container资源
- 4.应用AM向NM发起启动container的命令,将收到Container分配到内部的各个任务
1.2 资源保证机制
当单个节点的闲置资源无法满足应用的一个container时,有两种策略:
- 放弃当前节点等待下一个节点
- 在当前节点上预留一个Container申请,等到节点有资源时优先满足预留
这两种机制都有优缺点:
- 对于直接放弃的方案,可能会导致应用等待过久才能得到满足
- 对于预留的方案,会导致等待时间内的资源浪费,从而降低了资源利用率。
Yarn使用了预留的方案。
1.3 调度排序算法
当有多个用户需要请求资源时,如果调度器的资源能满足所有请求,那么直接都给他们安排即可,可是,如果资源无法满足所有用户,那么就需要考虑一下资源如何分配是合理的。
Yarn中主要使用的是:主资源公平调度算法(Dominant Resource Fairness),相关介绍参考:Yarn源码分析4-资源调度排序算法.
1.4 资源抢占模型
在资源调度器中,每个队列可设置一个最小资源量和最大资源量,其中
- 最小资源量是资源紧缺情况下每个队列需要保证的资源量
- 最大资源量是在极端情况下队列也不能超过的资源使用量
资源抢占发生的原因是为了保证队列的“最小资源量”。后续再专门梳理一下这部分的内容。
1.5 队列层级管理
队列层级管理具胡以下特点:
- 子队列
- 队列可以嵌套,每个队列均可以包含子队列。
- 用户只能将应用程序提交到叶子队列。
- 最少容量(min share)
- 每个子队列都有一个属性,表示可以占用父队列容量的百分比
- 调度器总是优先选择当前资源使用率最低的队列,并为之分配资源
- 最少容量不是总能保证的最低容量,需求低于这个值时是按需分配的
- 最少容量为正值,不能大于最大容量
- 最大容量(max share)
- 是一个队列资源使用的上限,任何时刻使用的资源总量不能超过此值
- 默认情况下最大容量是无限大
2.NM通过心跳向RM汇报节点信息
NM中负责心跳的类是NodeStatusUpdater类型的成员变量nodeStatusUpdater,它在NM调用serviceInit的时候被创建:
1 | //位置:org/apache/hadoop/yarn/server/nodemanager/NodeManager.java |
如上,根据是否使用nodeLabelsProvider有些参数上的区别(实例上调用的是同一个方法,只是nodeLabelsProvider在不声明时为空)。
1 | //位置:org/apache/hadoop/yarn/server/nodemanager/NodeManager.java |
NodeStatusUpdaterImpl是真正的负责与RM通讯的类,其中的serviceStart方法中会进行注册和心跳:
1 | //位置:org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java |
向RM注册部分直接跳过了,这里重点看一下心跳:
1 | //位置:org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java |
下面看一下StatusUpdaterRunnable的run方法:
1 | //位置:org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java |
3.RM收到心跳
RM中负责接收NM心跳的子服务是:ResourceTrackerService
下面直接看它的nodeHeartbeat方法。
1 | //位置:org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java |
这里的关键是第4步,它会向RM中发送一个事件,这个事件的处理中会包含调度的过程。
3.1 谁处理RMNodeStatusEvent
其中RMNodeStatusEvent继承自RMNodeEvent,RMNodeEvent在RM的注册处理器的代码如下:
1 | //位置:org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java |
其中NodeEventDispatcher的handle方法如下:
1 | //位置:org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java |
这里会调用RMNode的handle方法,RMNodeRMNode是一个接口类,它的实现者是:RMNodeImpl,下面看一下它的handle方法。
1 | //位置:org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java |
这里会进入RMNodeImpl的状态机,由于RMNodeStatusEvent的事件类型是RMNodeEventType.STATUS_UPDATE,看一下状态中对这个事件的处理只有两种情况:
- 从RUNNING到RUNNING、RUNNING,调用StatusUpdateWhenHealthyTransition
- 从DECOMMISSIONING到DECOMMISSIONING、DECOMMISSIONED,调用StatusUpdateWhenHealthyTransition
- 从UNHEALTHY到UNHEALTHY、RUNNING,调用StatusUpdateWhenUnHealthyTransition
这里,选择最常见的情况,就是从RUNNING到RUNNING的情况,查看被调用的StatusUpdateWhenHealthyTransition的transition方法:
1 | //位置:org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java |
这里最关键是部分是向向RM发送一个NodeUpdateSchedulerEvent事件。
3.2 谁处理NodeUpdateSchedulerEvent事件
NodeUpdateSchedulerEvent继承自SchedulerEvent,SchedulerEvent在RM中注册的处理器如下:
1 | //位置:org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java |
其中createSchedulerEventDispatcher()代码如下:
1 | //位置:org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java |
其中scheduler对象是根据配置yarn.resourcemanager.yarn.resourcemanager指定的类生成的对象,这里使用org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler。
下面进入FairScheduler的handle方法:
1 | //位置:org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java |
由于NodeUpdateSchedulerEvent的事件类型是SchedulerEventType.NODE_UPDATE,这里会进入NODE_UPDATE处理逻辑。
1 | //位置:org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java |
上面的代码中,
- 先调用父类的nodeUpdate()方法会进行container状态更新,和NM的状态的更新,这里跳过这部分逻辑。
- 然后获取了FSSchedulerNode的一个实例,并尝试进行调度。下面重点看一下这块逻辑。
1 | //位置:org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java |
这里不关注抢占和reserve机制,重点关注分配新container的部分,这里是调用queueMgr,找到RootQueue,然后调用了它的assignContainer(node)方法。
1 | //位置:org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java |
关于drf的思想参考:Yarn源码分析4-资源调度排序算法.
这里重点介绍一下读锁内的assignContainer的部分:
- 通过遍历所有孩子节点,递归的调用assignContainer方法。
- 如果孩子节点是FSParentQueue类型,那么还是递归进入到跟刚才一样的逻辑中.
- 如果孩子节点是FSLeafQueue类型,那么进入到后面的逻辑:
1 | //位置:org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java |
上面这块代码,有两个点需要重点关注:
- fetchAppsWithDemand:找到饥饿的app列表,并按照drf的策略进行排序,然后遍历
- FSAppAttempt类的实例sched.assignContainer(node)方法,这里会进行container的分配,下面进入这块逻辑:
1 | //位置:org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java |
首先判断是否达到了队列中可用于运行AM的资源比例限制,如果没有的话,继续跟进:
1 | //位置:org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java |
上面的的代码,主要逻辑是按priority从高到低的顺序遍历所有的ResourceRequest,针对每个ResourceRequest,在当前的node上面,找到适合它的locality,并将这个locality传入到下一级的assignContainer()函数中。
1 | //位置:org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java |
上面代码中,主要是看一下能否分配一个container出来,如果不能话,那么看一下能否进行一次reserve。
这是重点关注分配一个container出来的逻辑:
1 | //位置:org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java |
4.AM向RM发心跳认领资源
经过上面的过程,已经能将分配给APP的container记录在RM内存中了。接下来就是AM的心跳领取分配给自己的资源,并用来计算的过程了。
4.1 AM发起请求
由于各个计算引擎的AM逻辑不同,这里使用通用的接口方法为入口:
1 | //位置:org/apache/hadoop/yarn/api/ApplicationMasterProtocol.java |
各个计算引擎的AM是通过allocate()这个API来向RM发起RPC请求获取计算资源,下面看一下RM是如何处理这个RPC请求的。
4.2 RM处理AM请求
RM中负责处理AM心跳请求的服务是:ApplicationMasterService,其内部的allocate()负责处理AM的RPC请求,具体逻辑如下:
1 | //位置:org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java |
这里重点关注调度相关的主逻辑,因此继续跟进到amsProcessingChain.allocate这个方法中。
1 | //位置:org/apache/hadoop/yarn/server/resourcemanager/AMSProcessingChain.java |
这个方法非常简单,目前先不关注OpportunisticContainer的部分(hadoop 3.0的新特性),看看历史版本中都存在的主要调度功能
1 | //位置:org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java |
上面的逻辑中,重点关注调度的主线,继续跟进getScheduler().allocate方法,这里的scheduler假设配置是的FairScheduler,那么进入它的allocate方法中。
1 | //位置:org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java |
到这里,AM可以拿到RM分配的container了,整体的过程就是这样。至于AM拿到Container之后,怎么样来使用,各个计算引擎都不太一样,就不再分别介绍了。
至此,整个YARN的调度的过程也就清楚了。