Yarn源码分析5-资源调度

Yarn源码分析5-资源调度
《Hadoop技术内幕-深入解析YARN架构设计与实现原理》学习笔记。 (Yarn源码基于Hadoop 3.0.0)

Yarn源码小组讨论班
2018-06-26

1. 基本架构

资源调度器是最核心的组件之一,并且在yarn中是可插拔的,yarn中定义了一套接口规范,以方便用户实现自己的调度器,同时yarn中自带了FIFO,Capacity Sheduler, Fair Scheduler三种常用资源调度器。

1.1 调度模型

Yarn的资源分配过程中异步的,也就是说调度器将资源分配给一个应用后,不会立刻通知AM,而是等AM的心跳主动来取。整个资源的分配过程可以概括为以下的步骤:

  • 1.NM通过心跳向RM汇报节点信息
  • 2.RM按照一定策略将NM上的资源分配给应用(并没有直接分配,而是将结果放到一个内存数据结构)
  • 3.应用AM向RM发心跳,以领取之前记录中在RM中的最新分配给自己的Container资源
  • 4.应用AM向NM发起启动container的命令,将收到Container分配到内部的各个任务

1.2 资源保证机制

当单个节点的闲置资源无法满足应用的一个container时,有两种策略:

  • 放弃当前节点等待下一个节点
  • 在当前节点上预留一个Container申请,等到节点有资源时优先满足预留

这两种机制都有优缺点:

  • 对于直接放弃的方案,可能会导致应用等待过久才能得到满足
  • 对于预留的方案,会导致等待时间内的资源浪费,从而降低了资源利用率。

Yarn使用了预留的方案。

1.3 调度排序算法

当有多个用户需要请求资源时,如果调度器的资源能满足所有请求,那么直接都给他们安排即可,可是,如果资源无法满足所有用户,那么就需要考虑一下资源如何分配是合理的。

Yarn中主要使用的是:主资源公平调度算法(Dominant Resource Fairness),相关介绍参考:Yarn源码分析4-资源调度排序算法.

1.4 资源抢占模型

在资源调度器中,每个队列可设置一个最小资源量和最大资源量,其中

  • 最小资源量是资源紧缺情况下每个队列需要保证的资源量
  • 最大资源量是在极端情况下队列也不能超过的资源使用量

资源抢占发生的原因是为了保证队列的“最小资源量”。后续再专门梳理一下这部分的内容。

1.5 队列层级管理

队列层级管理具胡以下特点:

  • 子队列
    • 队列可以嵌套,每个队列均可以包含子队列。
    • 用户只能将应用程序提交到叶子队列。
  • 最少容量(min share)
    • 每个子队列都有一个属性,表示可以占用父队列容量的百分比
    • 调度器总是优先选择当前资源使用率最低的队列,并为之分配资源
    • 最少容量不是总能保证的最低容量,需求低于这个值时是按需分配的
    • 最少容量为正值,不能大于最大容量
  • 最大容量(max share)
    • 是一个队列资源使用的上限,任何时刻使用的资源总量不能超过此值
    • 默认情况下最大容量是无限大

2.NM通过心跳向RM汇报节点信息

NM中负责心跳的类是NodeStatusUpdater类型的成员变量nodeStatusUpdater,它在NM调用serviceInit的时候被创建:

1
2
3
4
5
6
7
8
9
10
//位置:org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
if (null == nodeLabelsProvider) {
nodeStatusUpdater =
createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
} else {
addIfService(nodeLabelsProvider);
nodeStatusUpdater =
createNodeStatusUpdater(context, dispatcher, nodeHealthChecker,
nodeLabelsProvider);
}

如上,根据是否使用nodeLabelsProvider有些参数上的区别(实例上调用的是同一个方法,只是nodeLabelsProvider在不声明时为空)。

1
2
3
4
5
6
//位置:org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,
metrics, nodeLabelsProvider);
}

NodeStatusUpdaterImpl是真正的负责与RM通讯的类,其中的serviceStart方法中会进行注册和心跳:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//位置:org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@Override
protected void serviceStart() throws Exception {

// NodeManager is the last service to start, so NodeId is available.
this.nodeId = this.context.getNodeId();
LOG.info("Node ID assigned is : " + this.nodeId);
this.httpPort = this.context.getHttpPort();
this.nodeManagerVersionId = YarnVersionInfo.getVersion();
try {
// Registration has to be in start so that ContainerManager can get the
// perNM tokens needed to authenticate ContainerTokens.
this.resourceTracker = getRMClient();
registerWithRM(); //向RM注册
super.serviceStart();
startStatusUpdater(); //启动心跳线程
} catch (Exception e) {
String errorMessage = "Unexpected error starting NodeStatusUpdater";
LOG.error(errorMessage, e);
throw new YarnRuntimeException(e);
}
}

向RM注册部分直接跳过了,这里重点看一下心跳:

1
2
3
4
5
6
7
//位置:org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
protected void startStatusUpdater() {
statusUpdaterRunnable = new StatusUpdaterRunnable();
statusUpdater =
new Thread(statusUpdaterRunnable, "Node Status Updater");
statusUpdater.start();
}

下面看一下StatusUpdaterRunnable的run方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
//位置:org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
public void run() {
int lastHeartbeatID = 0;
while (!isStopped) { //在被终止前死循环的跑
// Send heartbeat
try {
NodeHeartbeatResponse response = null;
Set<NodeLabel> nodeLabelsForHeartbeat =
nodeLabelsHandler.getNodeLabelsForHeartbeat();
NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID);
NodeHeartbeatRequest request =
NodeHeartbeatRequest.newInstance(nodeStatus,
NodeStatusUpdaterImpl.this.context
.getContainerTokenSecretManager().getCurrentKey(),
NodeStatusUpdaterImpl.this.context
.getNMTokenSecretManager().getCurrentKey(),
nodeLabelsForHeartbeat,
NodeStatusUpdaterImpl.this.context
.getRegisteringCollectors());

if (logAggregationEnabled) {
// pull log aggregation status for application running in this NM
List<LogAggregationReport> logAggregationReports =
getLogAggregationReportsForApps(context
.getLogAggregationStatusForApps());
if (logAggregationReports != null
&& !logAggregationReports.isEmpty()) {
request.setLogAggregationReportsForApps(logAggregationReports);
}
}

//这里向RM发起RPC请求,并得到一个返回的结果response
response = resourceTracker.nodeHeartbeat(request);
//get next heartbeat interval from response
nextHeartBeatInterval = response.getNextHeartBeatInterval();
updateMasterKeys(response);

if (!handleShutdownOrResyncCommand(response)) {
//省略很多行,这里是处理RM发来的停止运行或重新注册的情况
}
// Handling node resource update case.
Resource newResource = response.getResource();
if (newResource != null) {
updateNMResource(newResource);
if (LOG.isDebugEnabled()) {
LOG.debug("Node's resource is updated to " +
newResource.toString());
}
}
if (YarnConfiguration.timelineServiceV2Enabled(context.getConf())) {
updateTimelineCollectorData(response);
}

} catch (ConnectException e) {
//catch and throw the exception if tried MAX wait time to connect RM
dispatcher.getEventHandler().handle(
new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
// failed to connect to RM.
failedToConnect = true;
throw new YarnRuntimeException(e);
} catch (Exception e) {

// TODO Better error handling. Thread can die with the rest of the
// NM still running.
LOG.error("Caught exception in status-updater", e);
} finally {
synchronized (heartbeatMonitor) {
nextHeartBeatInterval = nextHeartBeatInterval <= 0 ?
YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS :
nextHeartBeatInterval;
try {
heartbeatMonitor.wait(nextHeartBeatInterval);
} catch (InterruptedException e) {
// Do Nothing
}
}
}
}
}

3.RM收到心跳

RM中负责接收NM心跳的子服务是:ResourceTrackerService

下面直接看它的nodeHeartbeat方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//位置:org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnException, IOException {

NodeStatus remoteNodeStatus = request.getNodeStatus();
/**
* 处理一个心中总共分为如下几步:
* 1. 判断它是否是一个合法的node(是否已经被拉黑过)
* 2. 判断它是否是一个注册过的node
* 3. 判断这个心跳是否是一个重复的心跳
* 4. 把NM的状态发个事件给RMNodeStatusEvent的处理器
* 5. 更新node label(如果分布式的label管理策略打开)
* 6. 判断是否是动态更新资源的节点
* 7. 发送排队的container的队列长度限制
*/
// 1-3步跳过

// 4. Send status to RMNode, saving the latest response.
RMNodeStatusEvent nodeStatusEvent =
new RMNodeStatusEvent(nodeId, remoteNodeStatus, nodeHeartBeatResponse);
if (request.getLogAggregationReportsForApps() != null
&& !request.getLogAggregationReportsForApps().isEmpty()) {
nodeStatusEvent.setLogAggregationReportsForApps(request
.getLogAggregationReportsForApps());
}
this.rmContext.getDispatcher().getEventHandler().handle(nodeStatusEvent);

// 5-7步跳过
return nodeHeartBeatResponse;
}

这里的关键是第4步,它会向RM中发送一个事件,这个事件的处理中会包含调度的过程。

3.1 谁处理RMNodeStatusEvent

其中RMNodeStatusEvent继承自RMNodeEvent,RMNodeEvent在RM的注册处理器的代码如下:

1
2
3
4
//位置:org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
// Register event handler for RmNodes
rmDispatcher.register(
RMNodeEventType.class, new NodeEventDispatcher(rmContext));

其中NodeEventDispatcher的handle方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
//位置:org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
public void handle(RMNodeEvent event) {
NodeId nodeId = event.getNodeId();
RMNode node = this.rmContext.getRMNodes().get(nodeId);
if (node != null) {
try {
((EventHandler<RMNodeEvent>) node).handle(event);
} catch (Throwable t) {
LOG.error("Error in handling event type " + event.getType()
+ " for node " + nodeId, t);
}
}
}

这里会调用RMNode的handle方法,RMNodeRMNode是一个接口类,它的实现者是:RMNodeImpl,下面看一下它的handle方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//位置:org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
public void handle(RMNodeEvent event) {
LOG.debug("Processing " + event.getNodeId() + " of type " + event.getType());
try {
writeLock.lock();
NodeState oldState = getState();
try {
stateMachine.doTransition(event.getType(), event);
} catch (InvalidStateTransitionException e) {
LOG.error("Can't handle this event at current state", e);
LOG.error("Invalid event " + event.getType() +
" on Node " + this.nodeId + " oldState " + oldState);
}
if (oldState != getState()) {
LOG.info(nodeId + " Node Transitioned from " + oldState + " to "
+ getState());
}
}

finally {
writeLock.unlock();
}
}

这里会进入RMNodeImpl的状态机,由于RMNodeStatusEvent的事件类型是RMNodeEventType.STATUS_UPDATE,看一下状态中对这个事件的处理只有两种情况:

  • 从RUNNING到RUNNING、RUNNING,调用StatusUpdateWhenHealthyTransition
  • 从DECOMMISSIONING到DECOMMISSIONING、DECOMMISSIONED,调用StatusUpdateWhenHealthyTransition
  • 从UNHEALTHY到UNHEALTHY、RUNNING,调用StatusUpdateWhenUnHealthyTransition

这里,选择最常见的情况,就是从RUNNING到RUNNING的情况,查看被调用的StatusUpdateWhenHealthyTransition的transition方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
//位置:org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {

RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event;
rmNode.setOpportunisticContainersStatus(
statusEvent.getOpportunisticContainersStatus());
NodeHealthStatus remoteNodeHealthStatus = updateRMNodeFromStatusEvents(
rmNode, statusEvent);
NodeState initialState = rmNode.getState();

//跳过unhealth和decommsission的判断的代码

rmNode.handleContainerStatus(statusEvent.getContainers());
rmNode.handleReportedIncreasedContainers(
statusEvent.getNMReportedIncreasedContainers());

List<LogAggregationReport> logAggregationReportsForApps =
statusEvent.getLogAggregationReportsForApps();
if (logAggregationReportsForApps != null
&& !logAggregationReportsForApps.isEmpty()) {
rmNode.handleLogAggregationStatus(logAggregationReportsForApps);
}

if(rmNode.nextHeartBeat) {
rmNode.nextHeartBeat = false;
//敲黑板。这里是重点,向RM发送一个NodeUpdateSchedulerEvent事件
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeUpdateSchedulerEvent(rmNode));
}

// Update DTRenewer in secure mode to keep these apps alive. Today this is
// needed for log-aggregation to finish long after the apps are gone.
if (UserGroupInformation.isSecurityEnabled()) {
rmNode.context.getDelegationTokenRenewer().updateKeepAliveApplications(
statusEvent.getKeepAliveAppIds());
}

return initialState;
}

这里最关键是部分是向向RM发送一个NodeUpdateSchedulerEvent事件。

3.2 谁处理NodeUpdateSchedulerEvent事件

NodeUpdateSchedulerEvent继承自SchedulerEvent,SchedulerEvent在RM中注册的处理器如下:

1
2
3
4
//位置:org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
schedulerDispatcher = createSchedulerEventDispatcher();
addIfService(schedulerDispatcher);
rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher);

其中createSchedulerEventDispatcher()代码如下:

1
2
3
4
//位置:org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
return new EventDispatcher(this.scheduler, "SchedulerEventDispatcher");
}

其中scheduler对象是根据配置yarn.resourcemanager.yarn.resourcemanager指定的类生成的对象,这里使用org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler。

下面进入FairScheduler的handle方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//位置:org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
public void handle(SchedulerEvent event) {
switch (event.getType()) {
case NODE_ADDED://省略这部分逻辑
case NODE_REMOVED://省略这部分逻辑
case NODE_UPDATE:
if (!(event instanceof NodeUpdateSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
nodeUpdate(nodeUpdatedEvent.getRMNode());
break;
case APP_ADDED://省略这部分逻辑
case APP_REMOVED://省略这部分逻辑
case NODE_RESOURCE_UPDATE://省略这部分逻辑
case APP_ATTEMPT_ADDED://省略这部分逻辑
case APP_ATTEMPT_REMOVED://省略这部分逻辑
case CONTAINER_EXPIRED://省略这部分逻辑
default:
LOG.error("Unknown event arrived at FairScheduler: " + event.toString());
}
}

由于NodeUpdateSchedulerEvent的事件类型是SchedulerEventType.NODE_UPDATE,这里会进入NODE_UPDATE处理逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//位置:org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
protected void nodeUpdate(RMNode nm) {
try {
writeLock.lock();
long start = getClock().getTime();
eventLog.log("HEARTBEAT", nm.getHostName());
super.nodeUpdate(nm);

FSSchedulerNode fsNode = getFSSchedulerNode(nm.getNodeID());
attemptScheduling(fsNode);

long duration = getClock().getTime() - start;
fsOpDurations.addNodeUpdateDuration(duration);
} finally {
writeLock.unlock();
}
}

上面的代码中,

  • 先调用父类的nodeUpdate()方法会进行container状态更新,和NM的状态的更新,这里跳过这部分逻辑。
  • 然后获取了FSSchedulerNode的一个实例,并尝试进行调度。下面重点看一下这块逻辑。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
//位置:org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
void attemptScheduling(FSSchedulerNode node) {
try {
writeLock.lock();
if (rmContext.isWorkPreservingRecoveryEnabled() && !rmContext
.isSchedulerReadyForAllocatingContainers()) {
return;
}

final NodeId nodeID = node.getNodeID();
if (!nodeTracker.exists(nodeID)) {
// The node might have just been removed while this thread was waiting
// on the synchronized lock before it entered this synchronized method
LOG.info(
"Skipping scheduling as the node " + nodeID + " has been removed");
return;
}

// Assign new containers...
// 1. 先确认抢占的被分配
// 2. 再检查有没有reserved预留
// 3. 最后再进行调度分配新的container

// Apps may wait for preempted containers
// We have to satisfy these first to avoid cases, when we preempt
// a container for A from B and C gets the preempted containers,
// when C does not qualify for preemption itself.
assignPreemptedContainers(node);
FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
boolean validReservation = false;
if (reservedAppSchedulable != null) {
validReservation = reservedAppSchedulable.assignReservedContainer(node);
}
if (!validReservation) {
// No reservation, schedule at queue which is farthest below fair share
int assignedContainers = 0;
Resource assignedResource = Resources.clone(Resources.none());
Resource maxResourcesToAssign = Resources.multiply(
node.getUnallocatedResource(), 0.5f);
while (node.getReservedContainer() == null) {
//敲黑板,这里是分配的逻辑
Resource assignment = queueMgr.getRootQueue().assignContainer(node);
if (assignment.equals(Resources.none())) {
break;
}

assignedContainers++;
Resources.addTo(assignedResource, assignment);
if (!shouldContinueAssigning(assignedContainers, maxResourcesToAssign,
assignedResource)) {
break;
}
}
}
updateRootQueueMetrics();
} finally {
writeLock.unlock();
}
}

这里不关注抢占和reserve机制,重点关注分配新container的部分,这里是调用queueMgr,找到RootQueue,然后调用了它的assignContainer(node)方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//位置:org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
public Resource assignContainer(FSSchedulerNode node) {
Resource assigned = Resources.none();

// 超过了max share就直接返回
if (!assignContainerPreCheck(node)) {
return assigned;
}

// 对所有的子队列进行排序
// 排序的策略在fair-scheduler.xml里面的配置项defaultQueueSchedulingPolicy决定,这里使用drf
//
writeLock.lock();
try {
Collections.sort(childQueues, policy.getComparator());
} finally {
writeLock.unlock();
}

/*
* 这里释放了写锁,加入了读锁
* 这样可能会带来两个问题:
* 1. 新增了一个queue,不影响结果正确性,下次会再处理新queue
* 2. 删除了一个queue,这个最好处理一下,不过目前没有处理,也还好
*/
readLock.lock();
try {
for (FSQueue child : childQueues) {
assigned = child.assignContainer(node);
if (!Resources.equals(assigned, Resources.none())) {
break;
}
}
} finally {
readLock.unlock();
}
return assigned;
}

关于drf的思想参考:Yarn源码分析4-资源调度排序算法.

这里重点介绍一下读锁内的assignContainer的部分:

  • 通过遍历所有孩子节点,递归的调用assignContainer方法。
    • 如果孩子节点是FSParentQueue类型,那么还是递归进入到跟刚才一样的逻辑中.
    • 如果孩子节点是FSLeafQueue类型,那么进入到后面的逻辑:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//位置:org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
public Resource assignContainer(FSSchedulerNode node) {
Resource assigned = none();
if (LOG.isDebugEnabled()) {
LOG.debug("Node " + node.getNodeName() + " offered to queue: " +
getName() + " fairShare: " + getFairShare());
}
// 检查是否超过了max share
if (!assignContainerPreCheck(node)) {
return assigned;
}

for (FSAppAttempt sched : fetchAppsWithDemand(true)) {
if (SchedulerAppUtils.isPlaceBlacklisted(sched, node, LOG)) {
continue;
}
assigned = sched.assignContainer(node);
if (!assigned.equals(none())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Assigned container in queue:" + getName() + " " +
"container:" + assigned);
}
break;
}
}
return assigned;
}

上面这块代码,有两个点需要重点关注:

  • fetchAppsWithDemand:找到饥饿的app列表,并按照drf的策略进行排序,然后遍历
  • FSAppAttempt类的实例sched.assignContainer(node)方法,这里会进行container的分配,下面进入这块逻辑:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//位置:org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
public Resource assignContainer(FSSchedulerNode node) {
if (isOverAMShareLimit()) {
PendingAsk amAsk = appSchedulingInfo.getNextPendingAsk();
updateAMDiagnosticMsg(amAsk.getPerAllocationResource(),
" exceeds maximum AM resource allowed).");
if (LOG.isDebugEnabled()) {
LOG.debug("AM resource request: " + amAsk.getPerAllocationResource()
+ " exceeds maximum AM resource allowed, "
+ getQueue().dumpState());
}
return Resources.none();
}
return assignContainer(node, false);
}

首先判断是否达到了队列中可用于运行AM的资源比例限制,如果没有的话,继续跟进:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
//位置:org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
private Resource assignContainer(FSSchedulerNode node, boolean reserved) {
if (LOG.isTraceEnabled()) {
LOG.trace("Node offered to app: " + getName() + " reserved: " + reserved);
}

// 按照priority进行排序
Collection<SchedulerRequestKey> keysToTry = (reserved) ?
Collections.singletonList(
node.getReservedContainer().getReservedSchedulerKey()) :
getSchedulerKeys();

// For each priority, see if we can schedule a node local, rack local
// or off-switch request. Rack of off-switch requests may be delayed
// (not scheduled) in order to promote better locality.
try {
writeLock.lock();

// 按priority从高到低遍历所有的ResourceRequest
// 如果一个ResourceRequest可以在当前node上分配出来,就进入分配逻辑
for (SchedulerRequestKey schedulerKey : keysToTry) {
// 跳过无法在当前node上进行分配的请求。
// hasContainerForNode()这个函数会分node,rack,any三种情况来考虑是否有合适的container。
// 并且也会考虑当前node上剩余的资源是否还足够分配
if (!reserved && !hasContainerForNode(schedulerKey, node)) {
continue;
}

// 调度机会计数加1
addSchedulingOpportunity(schedulerKey);

PendingAsk rackLocalPendingAsk = getPendingAsk(schedulerKey,
node.getRackName());
PendingAsk nodeLocalPendingAsk = getPendingAsk(schedulerKey,
node.getNodeName());

// 如果有node级别的locality请求,并且不支持relaxLocality,那就给个warn。
if (nodeLocalPendingAsk.getCount() > 0
&& !appSchedulingInfo.canDelayTo(schedulerKey,
node.getNodeName())) {
LOG.warn("Relax locality off is not supported on local request: "
+ nodeLocalPendingAsk);
}

NodeType allowedLocality;
if (scheduler.isContinuousSchedulingEnabled()) {
allowedLocality = getAllowedLocalityLevelByTime(schedulerKey,
scheduler.getNodeLocalityDelayMs(),
scheduler.getRackLocalityDelayMs(),
scheduler.getClock().getTime());
} else {
allowedLocality = getAllowedLocalityLevel(schedulerKey,
scheduler.getNumClusterNodes(),
scheduler.getNodeLocalityThreshold(),
scheduler.getRackLocalityThreshold());
}

// 如果同时有node和rack级别的请求,就以NODE_LOCAL为参数进入下一步的assignContainer函数,并返回
if (rackLocalPendingAsk.getCount() > 0
&& nodeLocalPendingAsk.getCount() > 0) {
if (LOG.isTraceEnabled()) {
LOG.trace("Assign container on " + node.getNodeName()
+ " node, assignType: NODE_LOCAL" + ", allowedLocality: "
+ allowedLocality + ", priority: " + schedulerKey.getPriority()
+ ", app attempt id: " + this.attemptId);
}
return assignContainer(node, nodeLocalPendingAsk, NodeType.NODE_LOCAL,
reserved, schedulerKey);
}

// 如果错过了上面的node级别,并且还不支持降级到rack级别,那就跳过这次调度机会
if (!appSchedulingInfo.canDelayTo(schedulerKey, node.getRackName())) {
continue;
}

// 以RACK_LOCAL为参数进入下一步的assignContainer函数,并返回
if (rackLocalPendingAsk.getCount() > 0
&& (allowedLocality.equals(NodeType.RACK_LOCAL) || allowedLocality
.equals(NodeType.OFF_SWITCH))) {
if (LOG.isTraceEnabled()) {
LOG.trace("Assign container on " + node.getNodeName()
+ " node, assignType: RACK_LOCAL" + ", allowedLocality: "
+ allowedLocality + ", priority: " + schedulerKey.getPriority()
+ ", app attempt id: " + this.attemptId);
}
return assignContainer(node, rackLocalPendingAsk, NodeType.RACK_LOCAL,
reserved, schedulerKey);
}

PendingAsk offswitchAsk = getPendingAsk(schedulerKey,
ResourceRequest.ANY);
if (!appSchedulingInfo.canDelayTo(schedulerKey, ResourceRequest.ANY)) {
continue;
}

// 以OFF_SWITCH为参数进入下一步的assignContainer函数,并返回
if (offswitchAsk.getCount() > 0) {
if (getAppPlacementAllocator(schedulerKey).getUniqueLocationAsks()
<= 1 || allowedLocality.equals(NodeType.OFF_SWITCH)) {
if (LOG.isTraceEnabled()) {
LOG.trace("Assign container on " + node.getNodeName()
+ " node, assignType: OFF_SWITCH" + ", allowedLocality: "
+ allowedLocality + ", priority: "
+ schedulerKey.getPriority()
+ ", app attempt id: " + this.attemptId);
}
return assignContainer(node, offswitchAsk, NodeType.OFF_SWITCH,
reserved, schedulerKey);
}
}

if (LOG.isTraceEnabled()) {
LOG.trace("Can't assign container on " + node.getNodeName()
+ " node, allowedLocality: " + allowedLocality + ", priority: "
+ schedulerKey.getPriority() + ", app attempt id: "
+ this.attemptId);
}
}
} finally {
writeLock.unlock();
}

return Resources.none();
}

上面的的代码,主要逻辑是按priority从高到低的顺序遍历所有的ResourceRequest,针对每个ResourceRequest,在当前的node上面,找到适合它的locality,并将这个locality传入到下一级的assignContainer()函数中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
//位置:org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
private Resource assignContainer(
FSSchedulerNode node, PendingAsk pendingAsk, NodeType type,
boolean reserved, SchedulerRequestKey schedulerKey) {

// 当前的request需要多少资源
Resource capability = pendingAsk.getPerAllocationResource();

// 当前这个Node有多少资源
Resource available = node.getUnallocatedResource();

Container reservedContainer = null;
if (reserved) {
reservedContainer = node.getReservedContainer().getContainer();
}

// 资源够分配的
if (Resources.fitsIn(capability, available)) {
// 划重点:分配一个container出来
RMContainer allocatedContainer =
allocate(type, node, schedulerKey, pendingAsk,
reservedContainer);
if (allocatedContainer == null) {
// Did the application need this resource?
if (reserved) {
unreserve(schedulerKey, node);
}
return Resources.none();
}

// If we had previously made a reservation, delete it
if (reserved) {
unreserve(schedulerKey, node);
}

// Inform the node
node.allocateContainer(allocatedContainer);

// If not running unmanaged, the first container we allocate is always
// the AM. Set the amResource for this app and update the leaf queue's AM
// usage
if (!isAmRunning() && !getUnmanagedAM()) {
setAMResource(capability);
getQueue().addAMResourceUsage(capability);
setAmRunning(true);
}

return capability;
}

// 以下逻辑,是资源不够分配的情况,判断一下是否需要reserve
if (LOG.isDebugEnabled()) {
LOG.debug("Resource request: " + capability + " exceeds the available"
+ " resources of the node.");
}

if (isReservable(capability) && //如果应用比较饥饿,并且请求的资源超过了一定大小
!node.isPreemptedForApp(this) && //不准备在这个节点进行抢占,否则有可能有多个Reserve
reserve(pendingAsk.getPerAllocationResource(), node, reservedContainer,
type, schedulerKey)) {
updateAMDiagnosticMsg(capability, " exceeds the available resources of "
+ "the node and the request is reserved)");
if (LOG.isDebugEnabled()) {
LOG.debug(getName() + "'s resource request is reserved.");
}
return FairScheduler.CONTAINER_RESERVED;
} else {
updateAMDiagnosticMsg(capability, " exceeds the available resources of "
+ "the node and the request cannot be reserved)");
if (LOG.isDebugEnabled()) {
LOG.debug("Couldn't create reservation for app: " + getName()
+ ", at priority " + schedulerKey.getPriority());
}
return Resources.none();
}
}

上面代码中,主要是看一下能否分配一个container出来,如果不能话,那么看一下能否进行一次reserve。

这是重点关注分配一个container出来的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
//位置:org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
public RMContainer allocate(NodeType type, FSSchedulerNode node,
SchedulerRequestKey schedulerKey, PendingAsk pendingAsk,
Container reservedContainer) {
RMContainer rmContainer;
Container container;

try {
writeLock.lock();
// Update allowed locality level
NodeType allowed = allowedLocalityLevel.get(schedulerKey);
if (allowed != null) {
if (allowed.equals(NodeType.OFF_SWITCH) && (type.equals(
NodeType.NODE_LOCAL) || type.equals(NodeType.RACK_LOCAL))) {
this.resetAllowedLocalityLevel(schedulerKey, type);
} else if (allowed.equals(NodeType.RACK_LOCAL) && type.equals(
NodeType.NODE_LOCAL)) {
this.resetAllowedLocalityLevel(schedulerKey, type);
}
}

// Required sanity check - AM can call 'allocate' to update resource
// request without locking the scheduler, hence we need to check
if (getOutstandingAsksCount(schedulerKey) <= 0) {
return null;
}

container = reservedContainer;
if (container == null) {
// 敲黑板,这里会创建一个container实例出来。
container = createContainer(node, pendingAsk.getPerAllocationResource(),
schedulerKey);
}

// 下面的逻辑是记录这个新创建出来的container
// Create RMContainer
rmContainer = new RMContainerImpl(container, schedulerKey,
getApplicationAttemptId(), node.getNodeID(),
appSchedulingInfo.getUser(), rmContext);
((RMContainerImpl) rmContainer).setQueueName(this.getQueueName());

// 重点关注:这里会把rmContainer记录下来,等待下次AM心跳时,会从这里把分配出来的container带走
addToNewlyAllocatedContainers(node, rmContainer);
liveContainers.put(container.getId(), rmContainer);

// Update consumption and track allocations
List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
type, node, schedulerKey, container);
this.attemptResourceUsage.incUsed(container.getResource());
getQueue().incUsedResource(container.getResource());

// Update resource requests related to "request" and store in RMContainer
((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList);

// Inform the container
rmContainer.handle(
new RMContainerEvent(container.getId(), RMContainerEventType.START));

if (LOG.isDebugEnabled()) {
LOG.debug("allocate: applicationAttemptId=" + container.getId()
.getApplicationAttemptId() + " container=" + container.getId()
+ " host=" + container.getNodeId().getHost() + " type=" + type);
}
RMAuditLogger.logSuccess(getUser(), AuditConstants.ALLOC_CONTAINER,
"SchedulerApp", getApplicationId(), container.getId(),
container.getResource());
} finally {
writeLock.unlock();
}

return rmContainer;
}

4.AM向RM发心跳认领资源

经过上面的过程,已经能将分配给APP的container记录在RM内存中了。接下来就是AM的心跳领取分配给自己的资源,并用来计算的过程了。

4.1 AM发起请求

由于各个计算引擎的AM逻辑不同,这里使用通用的接口方法为入口:

1
2
3
//位置:org/apache/hadoop/yarn/api/ApplicationMasterProtocol.java
public AllocateResponse allocate(AllocateRequest request)
throws YarnException, IOException;

各个计算引擎的AM是通过allocate()这个API来向RM发起RPC请求获取计算资源,下面看一下RM是如何处理这个RPC请求的。

4.2 RM处理AM请求

RM中负责处理AM心跳请求的服务是:ApplicationMasterService,其内部的allocate()负责处理AM的RPC请求,具体逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
//位置:org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
public AllocateResponse allocate(AllocateRequest request)
throws YarnException, IOException {

AMRMTokenIdentifier amrmTokenIdentifier =
YarnServerSecurityUtils.authorizeRequest();

ApplicationAttemptId appAttemptId =
amrmTokenIdentifier.getApplicationAttemptId();

this.amLivelinessMonitor.receivedPing(appAttemptId);

/* 针对每个appAttempt,会有一个独立的锁对象
AllocateResponseLock lock = responseMap.get(appAttemptId);
if (lock == null) {
String message =
"Application attempt " + appAttemptId
+ " doesn't exist in ApplicationMasterService cache.";
LOG.error(message);
throw new ApplicationAttemptNotFoundException(message);
}
synchronized (lock) {
AllocateResponse lastResponse = lock.getAllocateResponse();
if (!hasApplicationMasterRegistered(appAttemptId)) {
String message =
"AM is not registered for known application attempt: "
+ appAttemptId
+ " or RM had restarted after AM registered. "
+ " AM should re-register.";
throw new ApplicationMasterNotRegisteredException(message);
}

// Normally request.getResponseId() == lastResponse.getResponseId()
if (getNextResponseId(request.getResponseId()) == lastResponse
.getResponseId()) {
// heartbeat one step old, simply return lastReponse
return lastResponse;
} else if (request.getResponseId() != lastResponse.getResponseId()) {
String message =
"Invalid responseId in AllocateRequest from application attempt: "
+ appAttemptId + ", expect responseId to be "
+ lastResponse.getResponseId() + ", but get "
+ request.getResponseId();
throw new InvalidApplicationMasterRequestException(message);
}

AllocateResponse response =
recordFactory.newRecordInstance(AllocateResponse.class);

// 这里是这次申请资源的处理关键点
// 通过这个方法,会把分配的container填入到response对象中
// ProcessingChain这个名字是不是很眼熟?就是设计模式中的责任链
this.amsProcessingChain.allocate(
amrmTokenIdentifier.getApplicationAttemptId(), request, response);

// update AMRMToken if the token is rolled-up
MasterKeyData nextMasterKey =
this.rmContext.getAMRMTokenSecretManager().getNextMasterKeyData();

if (nextMasterKey != null
&& nextMasterKey.getMasterKey().getKeyId() != amrmTokenIdentifier
.getKeyId()) {
RMApp app =
this.rmContext.getRMApps().get(appAttemptId.getApplicationId());
RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
RMAppAttemptImpl appAttemptImpl = (RMAppAttemptImpl)appAttempt;
Token<AMRMTokenIdentifier> amrmToken = appAttempt.getAMRMToken();
if (nextMasterKey.getMasterKey().getKeyId() !=
appAttemptImpl.getAMRMTokenKeyId()) {
LOG.info("The AMRMToken has been rolled-over. Send new AMRMToken back"
+ " to application: " + appAttemptId.getApplicationId());
amrmToken = rmContext.getAMRMTokenSecretManager()
.createAndGetAMRMToken(appAttemptId);
appAttemptImpl.setAMRMToken(amrmToken);
}
response.setAMRMToken(org.apache.hadoop.yarn.api.records.Token
.newInstance(amrmToken.getIdentifier(), amrmToken.getKind()
.toString(), amrmToken.getPassword(), amrmToken.getService()
.toString()));
}

/*
* As we are updating the response inside the lock object so we don't
* need to worry about unregister call occurring in between (which
* removes the lock object).
*/
response.setResponseId(getNextResponseId(lastResponse.getResponseId()));
lock.setAllocateResponse(response);
return response;
}
}

这里重点关注调度相关的主逻辑,因此继续跟进到amsProcessingChain.allocate这个方法中。

1
2
3
4
5
6
7
8
9
//位置:org/apache/hadoop/yarn/server/resourcemanager/AMSProcessingChain.java
public void allocate(ApplicationAttemptId appAttemptId,
AllocateRequest request, AllocateResponse response) throws YarnException {

// 这里秀了一把设计模式里面的责任链模式
// 各个服务通过像链表一样串起来,每个的head是下一个,这样AM的请求,会在多个注册的处理类里面依次调用一下
// 其实目前也只有两种处理器,默认的处理器和OpportunisticContainer的处理器
this.head.allocate(appAttemptId, request, response);
}

这个方法非常简单,目前先不关注OpportunisticContainer的部分(hadoop 3.0的新特性),看看历史版本中都存在的主要调度功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
//位置:org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
public void allocate(ApplicationAttemptId appAttemptId,
AllocateRequest request, AllocateResponse response) throws YarnException {

//这里是一些神圣的检查工作,跳过了

// 在AM重新拉起可以继续工作,并且配置为不希望使用上次的container的情况,需要释放一下历史的container
if (!app.getApplicationSubmissionContext()
.getKeepContainersAcrossApplicationAttempts()) {
try {
RMServerUtils.validateContainerReleaseRequest(release, appAttemptId);
} catch (InvalidContainerReleaseException e) {
LOG.warn("Invalid container release by application " + appAttemptId,
e);
throw e;
}
}

// 把资源请求变成增加和减少
List<UpdateContainerError> updateErrors = new ArrayList<>();
ContainerUpdates containerUpdateRequests =
RMServerUtils.validateAndSplitUpdateResourceRequests(
getRmContext(), request, maximumCapacity, updateErrors);

Allocation allocation;
RMAppAttemptState state =
app.getRMAppAttempt(appAttemptId).getAppAttemptState();
if (state.equals(RMAppAttemptState.FINAL_SAVING) ||
state.equals(RMAppAttemptState.FINISHING) ||
app.isAppFinalStateStored()) {
LOG.warn(appAttemptId + " is in " + state +
" state, ignore container allocate request.");
allocation = EMPTY_ALLOCATION;
} else {
//重点关注,当前的这个函数是心跳函数,发新请求和取之前的分配都要进行
allocation =
getScheduler().allocate(appAttemptId, ask, release,
blacklistAdditions, blacklistRemovals,
containerUpdateRequests);
}

if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) {
LOG.info("blacklist are updated in Scheduler." +
"blacklistAdditions: " + blacklistAdditions + ", " +
"blacklistRemovals: " + blacklistRemovals);
}
RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);

if (allocation.getNMTokens() != null &&
!allocation.getNMTokens().isEmpty()) {
response.setNMTokens(allocation.getNMTokens());
}

// Notify the AM of container update errors
ApplicationMasterServiceUtils.addToUpdateContainerErrors(
response, updateErrors);

// update the response with the deltas of node status changes
handleNodeUpdates(app, response);

// 这里就是在设置allocateResponse.setAllocatedContainers(allocatedContainers);
ApplicationMasterServiceUtils.addToAllocatedContainers(
response, allocation.getContainers());

response.setCompletedContainersStatuses(appAttempt
.pullJustFinishedContainers());
response.setAvailableResources(allocation.getResourceLimit());

addToContainerUpdates(response, allocation,
((AbstractYarnScheduler)getScheduler())
.getApplicationAttempt(appAttemptId).pullUpdateContainerErrors());

response.setNumClusterNodes(getScheduler().getNumClusterNodes());

// add collector address for this application
if (YarnConfiguration.timelineServiceV2Enabled(
getRmContext().getYarnConfiguration())) {
CollectorInfo collectorInfo = app.getCollectorInfo();
if (collectorInfo != null) {
response.setCollectorInfo(collectorInfo);
}
}

// add preemption to the allocateResponse message (if any)
response.setPreemptionMessage(generatePreemptionMessage(allocation));

// Set application priority
response.setApplicationPriority(app
.getApplicationPriority());
}

上面的逻辑中,重点关注调度的主线,继续跟进getScheduler().allocate方法,这里的scheduler假设配置是的FairScheduler,那么进入它的allocate方法中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
//位置:org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
public Allocation allocate(ApplicationAttemptId appAttemptId,
List<ResourceRequest> ask, List<ContainerId> release,
List<String> blacklistAdditions, List<String> blacklistRemovals,
ContainerUpdates updateRequests) {

// 依旧跳过一些神圣的检查工作

// 这里处理container的资源动态扩大缩小
handleContainerUpdates(application, updateRequests);

// 再检查一下
normalizeRequests(ask);

// Record container allocation start time
application.recordContainerRequestTime(getClock().getTime());

// 释放AM认为需要释放的container
releaseContainers(release, application);

ReentrantReadWriteLock.WriteLock lock = application.getWriteLock();
lock.lock();
try {
if (!ask.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"allocate: pre-update" + " applicationAttemptId=" + appAttemptId
+ " application=" + application.getApplicationId());
}
application.showRequests();

// 更新应用的申请。(这里前后是加锁的,加锁操作的,一般都是比较重要的点)
application.updateResourceRequests(ask);

application.showRequests();
}
} finally {
lock.unlock();
}

Set<ContainerId> preemptionContainerIds =
application.getPreemptionContainerIds();
if (LOG.isDebugEnabled()) {
LOG.debug(
"allocate: post-update" + " applicationAttemptId=" + appAttemptId
+ " #ask=" + ask.size() + " reservation= " + application
.getCurrentReservation());

LOG.debug("Preempting " + preemptionContainerIds.size()
+ " container(s)");
}

application.updateBlacklist(blacklistAdditions, blacklistRemovals);

// 这里是获取最近被分配的containers,其实就是之前RM在心跳时产生的container的最后步骤中放置的container。
List<Container> newlyAllocatedContainers =
application.pullNewlyAllocatedContainers();
// Record container allocation time
if (!(newlyAllocatedContainers.isEmpty())) {
application.recordContainerAllocationTime(getClock().getTime());
}

Resource headroom = application.getHeadroom();
application.setApplicationHeadroomForMetrics(headroom);
return new Allocation(newlyAllocatedContainers, headroom,
preemptionContainerIds, null, null,
application.pullUpdatedNMTokens(), null, null,
application.pullNewlyPromotedContainers(),
application.pullNewlyDemotedContainers());
}

到这里,AM可以拿到RM分配的container了,整体的过程就是这样。至于AM拿到Container之后,怎么样来使用,各个计算引擎都不太一样,就不再分别介绍了。

至此,整个YARN的调度的过程也就清楚了。