Yarn源码分析6-Reserve机制

Yarn源码分析6-Reserve机制
《Hadoop技术内幕-深入解析YARN架构设计与实现原理》学习笔记。 (Yarn源码基于Hadoop 3.1.0)

2018-07-17

前面的源码分析中[Yarn源码分析5-资源调度]讲述了整个调度的主干流程,但是很多分支并没有介绍,比如抢占机制、预约(reserve)机制、ContinueAssigning机制等,这篇文章具体介绍一下reserve机制。

1. 简介

当单个节点的闲置资源无法满足一个container的申请尺寸时,有两种策略:

  • 放弃当前节点等待下一个节点
  • 在当前节点上预留一个Container申请,等到节点有资源时优先满足预留,节点在满足此预留之前不能满足其它请求

这两种机制都有优缺点:

  • 对于直接放弃的方案,可能会导致应用等待过久才能得到满足
  • 对于预留的方案,会导致等待时间内的资源浪费,从而降低了资源利用率。

Yarn使用了预留的方案,也就是reserve的方案,这种方案会降低集群的资源利用率,但是可以保证一些container尺寸比较大的请求能得到公平的对待。下面从源码层面分析一下这个机制。

2. 从心跳开始

根据之前的调度主流程分析,节点的心跳到达时,会由FairScheduler的nodeUpdate(RMNode nm)进行处理,然后会调用到attemptScheduling(fsNode)方法,我们直接从这里开始。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
//位置:org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
void attemptScheduling(FSSchedulerNode node) {
try {
writeLock.lock();
if (rmContext.isWorkPreservingRecoveryEnabled() && !rmContext
.isSchedulerReadyForAllocatingContainers()) {
return;
}

final NodeId nodeID = node.getNodeID();
if (!nodeTracker.exists(nodeID)) {
// The node might have just been removed while this thread was waiting
// on the synchronized lock before it entered this synchronized method
LOG.info(
"Skipping scheduling as the node " + nodeID + " has been removed");
return;
}

// Assign new containers...
// 1. 先确认抢占的被分配
// 2. 再检查有没有reserved预留
// 3. 最后再进行调度分配新的container

// Apps may wait for preempted containers
// We have to satisfy these first to avoid cases, when we preempt
// a container for A from B and C gets the preempted containers,
// when C does not qualify for preemption itself.
assignPreemptedContainers(node);
FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
boolean validReservation = false;
if (reservedAppSchedulable != null) {
// 查看是否有一个合法的Reservation
validReservation = reservedAppSchedulable.assignReservedContainer(node);
}
if (!validReservation) {
// 没有reservation, 开始进入后续的调度流程
int assignedContainers = 0;
Resource assignedResource = Resources.clone(Resources.none());
Resource maxResourcesToAssign = Resources.multiply(
node.getUnallocatedResource(), 0.5f);
while (node.getReservedContainer() == null) {
Resource assignment = queueMgr.getRootQueue().assignContainer(node);
if (assignment.equals(Resources.none())) {
break;
}

assignedContainers++;
Resources.addTo(assignedResource, assignment);
if (!shouldContinueAssigning(assignedContainers, maxResourcesToAssign,
assignedResource)) {
break;
}
}
}
updateRootQueueMetrics();
} finally {
writeLock.unlock();
}
}

对于分析reserve机制,这里有个分叉:

  • 检查是否有合法的Reservation,如果有就进行资源分配,调度结束
  • 如果没有合法的Reservation,就进入后续的调度流程,但是后续调度流程中,有可能会分配一个Reservation

虽然代码中是Reservation的分配在前,创造Reservation在后面,但是这显然不是逻辑上的时间顺序,下面按照正常的时间顺序来分析一下这个过程:先分析创造一个Reservation的过程,再分析Reservation的分配过程。

3. Reservation的创造

先假设这次心跳中validReservation为false,进入后续的调度流程,根据之前的分析,通过几个跳转之后,会进入到FSAppAttempt中的assignContainer(FSSchedulerNode node),下面从这里开始:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//位置:org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@Override
public Resource assignContainer(FSSchedulerNode node) {
if (isOverAMShareLimit()) {
PendingAsk amAsk = appSchedulingInfo.getNextPendingAsk();
updateAMDiagnosticMsg(amAsk.getPerAllocationResource(),
" exceeds maximum AM resource allowed).");
if (LOG.isDebugEnabled()) {
LOG.debug("AM resource request: " + amAsk.getPerAllocationResource()
+ " exceeds maximum AM resource allowed, "
+ getQueue().dumpState());
}
return Resources.none();
}
return assignContainer(node, false);
}

这个方法中主要是判定是否超过针对AM资源使用配置的限制,值得注意的是最后一行中调用的方法return assignContainer(node, false)中的false代表的是当前的这个节点没有已经被分配了reserved的container。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
//位置:org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
private Resource assignContainer(FSSchedulerNode node, boolean reserved) {
if (LOG.isTraceEnabled()) {
LOG.trace("Node offered to app: " + getName() + " reserved: " + reserved);
}

// 这一句其貌不扬的语句非常重要,决定了是否要分配已经被reserved的container,还是非reserved:
// 如果是,要分配的就是node.getReservedContainer().getReservedSchedulerKey(),
// 当前的这个方法在真正的进行Reservation分配的时候也会被调用,就会走进reserved为true的情况。
// 否则,要分配的就是getSchedulerKeys()。根据前面调用函数的描述,这里走入reserved为false的情况。
Collection<SchedulerRequestKey> keysToTry = (reserved) ?
Collections.singletonList(
node.getReservedContainer().getReservedSchedulerKey()) :
getSchedulerKeys();

// For each priority, see if we can schedule a node local, rack local
// or off-switch request. Rack of off-switch requests may be delayed
// (not scheduled) in order to promote better locality.
try {
writeLock.lock();

// TODO (wandga): All logics in this method should be added to
// SchedulerPlacement#canDelayTo which is independent from scheduler.
// Scheduler can choose to use various/pluggable delay-scheduling
// implementation.
for (SchedulerRequestKey schedulerKey : keysToTry) {
// Skip it for reserved container, since
// we already check it in isValidReservation.
if (!reserved && !hasContainerForNode(schedulerKey, node)) {
continue;
}

addSchedulingOpportunity(schedulerKey);

PendingAsk rackLocalPendingAsk = getPendingAsk(schedulerKey,
node.getRackName());
PendingAsk nodeLocalPendingAsk = getPendingAsk(schedulerKey,
node.getNodeName());

if (nodeLocalPendingAsk.getCount() > 0
&& !appSchedulingInfo.canDelayTo(schedulerKey,
node.getNodeName())) {
LOG.warn("Relax locality off is not supported on local request: "
+ nodeLocalPendingAsk);
}

NodeType allowedLocality;
if (scheduler.isContinuousSchedulingEnabled()) {
allowedLocality = getAllowedLocalityLevelByTime(schedulerKey,
scheduler.getNodeLocalityDelayMs(),
scheduler.getRackLocalityDelayMs(),
scheduler.getClock().getTime());
} else {
allowedLocality = getAllowedLocalityLevel(schedulerKey,
scheduler.getNumClusterNodes(),
scheduler.getNodeLocalityThreshold(),
scheduler.getRackLocalityThreshold());
}

if (rackLocalPendingAsk.getCount() > 0
&& nodeLocalPendingAsk.getCount() > 0) {
if (LOG.isTraceEnabled()) {
LOG.trace("Assign container on " + node.getNodeName()
+ " node, assignType: NODE_LOCAL" + ", allowedLocality: "
+ allowedLocality + ", priority: " + schedulerKey.getPriority()
+ ", app attempt id: " + this.attemptId);
}
// 进行NODE_LOCAL级别的container分配
return assignContainer(node, nodeLocalPendingAsk, NodeType.NODE_LOCAL,
reserved, schedulerKey);
}

if (!appSchedulingInfo.canDelayTo(schedulerKey, node.getRackName())) {
continue;
}

if (rackLocalPendingAsk.getCount() > 0
&& (allowedLocality.equals(NodeType.RACK_LOCAL) || allowedLocality
.equals(NodeType.OFF_SWITCH))) {
if (LOG.isTraceEnabled()) {
LOG.trace("Assign container on " + node.getNodeName()
+ " node, assignType: RACK_LOCAL" + ", allowedLocality: "
+ allowedLocality + ", priority: " + schedulerKey.getPriority()
+ ", app attempt id: " + this.attemptId);
}
// 进行RACK_LOCAL级别的container分配
return assignContainer(node, rackLocalPendingAsk, NodeType.RACK_LOCAL,
reserved, schedulerKey);
}

PendingAsk offswitchAsk = getPendingAsk(schedulerKey,
ResourceRequest.ANY);
if (!appSchedulingInfo.canDelayTo(schedulerKey, ResourceRequest.ANY)) {
continue;
}

if (offswitchAsk.getCount() > 0) {
if (getAppPlacementAllocator(schedulerKey).getUniqueLocationAsks()
<= 1 || allowedLocality.equals(NodeType.OFF_SWITCH)) {
if (LOG.isTraceEnabled()) {
LOG.trace("Assign container on " + node.getNodeName()
+ " node, assignType: OFF_SWITCH" + ", allowedLocality: "
+ allowedLocality + ", priority: "
+ schedulerKey.getPriority()
+ ", app attempt id: " + this.attemptId);
}
// 进行OFF_SWITCH级别的container分配
return assignContainer(node, offswitchAsk, NodeType.OFF_SWITCH,
reserved, schedulerKey);
}
}

if (LOG.isTraceEnabled()) {
LOG.trace("Can't assign container on " + node.getNodeName()
+ " node, allowedLocality: " + allowedLocality + ", priority: "
+ schedulerKey.getPriority() + ", app attempt id: "
+ this.attemptId);
}
}
} finally {
writeLock.unlock();
}

return Resources.none();
}

上面的方法,最终会选择一个合适的locality级别,进入接下来的分配过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
//位置:org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
private Resource assignContainer(
FSSchedulerNode node, PendingAsk pendingAsk, NodeType type,
boolean reserved, SchedulerRequestKey schedulerKey) {

// How much does this request need?
Resource capability = pendingAsk.getPerAllocationResource();

// How much does the node have?
Resource available = node.getUnallocatedResource();

Container reservedContainer = null;
if (reserved) {
// 如果当前在已经有reserve的情况,就把reservedContainer取出来
reservedContainer = node.getReservedContainer().getContainer();
}

// 在这个node上面,有足够的资源分给这个container吗?
if (Resources.fitsIn(capability, available)) {
// 这里创造了一个allocatedContainer实例,如果它不为空,说明真正的分配了一个container
RMContainer allocatedContainer =
allocate(type, node, schedulerKey, pendingAsk,
reservedContainer);
if (allocatedContainer == null) {
// 如果allocatedContainer实例为空,说明应用已经无欲无求,那么可以把已经seserved释放了
if (reserved) {
unreserve(schedulerKey, node);
}
return Resources.none();
}

// 能走到这,说明确实分配了一下container,那么如果之前有reserve的话,也就释放了吧
if (reserved) {
unreserve(schedulerKey, node);
}

// 在node的对象中标记一下,刚刚申请的container
node.allocateContainer(allocatedContainer);

// 对于非unmanaged AM的情况需要特殊处理,这里先不介绍unmanaged AM机制
if (!isAmRunning() && !getUnmanagedAM()) {
setAMResource(capability);
getQueue().addAMResourceUsage(capability);
setAmRunning(true);
}
// 如果顺序的话,container已经分配了,这里就返回了
return capability;
}

if (LOG.isDebugEnabled()) {
LOG.debug("Resource request: " + capability + " exceeds the available"
+ " resources of the node.");
}

// 能走到这,说明应用的需求当前node没有办法满足,下面进行reserve了

// 首先需要是Reservable的,这里包含两点
// 1. 当前根据fairshare来判断,当前应用是饥饿的状态
// 2. 只有大于一定尺寸的container才可以进行reserve
if (isReservable(capability) &&
// 只有在当前应用并没有抢占需求的时候,进行reserve,否则可能会导致重复的Reserve
!node.isPreemptedForApp(this) &&
// 注意:在这里创建一个reservation
reserve(pendingAsk.getPerAllocationResource(), node, reservedContainer,
type, schedulerKey)) {
updateAMDiagnosticMsg(capability, " exceeds the available resources of "
+ "the node and the request is reserved)");
if (LOG.isDebugEnabled()) {
LOG.debug(getName() + "'s resource request is reserved.");
}
return FairScheduler.CONTAINER_RESERVED;
} else {
updateAMDiagnosticMsg(capability, " exceeds the available resources of "
+ "the node and the request cannot be reserved)");
if (LOG.isDebugEnabled()) {
LOG.debug("Couldn't create reservation for app: " + getName()
+ ", at priority " + schedulerKey.getPriority());
}
return Resources.none();
}
}

下面继续沿着reserve创建的函数reserve(…)继续跟进。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//位置:org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
private boolean reserve(Resource perAllocationResource, FSSchedulerNode node,
Container reservedContainer, NodeType type,
SchedulerRequestKey schedulerKey) {

RMContainer nodeReservedContainer = node.getReservedContainer();
// 在当前node没有ReservedContainer,
// 或者已有的ReservedContainer的appAttempId与当前appAttempId相同时,才进行reserve操作
boolean reservableForThisApp = nodeReservedContainer == null ||
nodeReservedContainer.getApplicationAttemptId()
.equals(getApplicationAttemptId());
// 并且,整个集群reserve的节点数量不能超过一定比率(默认0.05,配置:yarn.scheduler.fair.reservable-nodes)
if (reservableForThisApp &&!reservationExceedsThreshold(node, type)) {
LOG.info("Making reservation: node=" + node.getNodeName() +
" app_id=" + getApplicationId());
if (reservedContainer == null) {
// 如果历史上没有reserve,就创建一个reserve
reservedContainer =
createContainer(node, perAllocationResource,
schedulerKey);
getMetrics().reserveResource(node.getPartition(), getUser(),
reservedContainer.getResource());
RMContainer rmContainer =
super.reserve(node, schedulerKey, null, reservedContainer);
node.reserveResource(this, schedulerKey, rmContainer);
setReservation(node);
} else {
// 如果历史上已经有reserve了,那么super.reserve()方法中传入历史上的,
// 这个方法里面会进行re-reserve,并且会记录re-reserve次数,用来计算饥饿度
RMContainer rmContainer = node.getReservedContainer();
super.reserve(node, schedulerKey, rmContainer, reservedContainer);
node.reserveResource(this, schedulerKey, rmContainer);
setReservation(node);
}
return true;
}
return false;
}

至此,reserve已经被创建。

4. Reservation的分配

下面再回过头来看一下查看是否有一个合法的Reservation的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//位置:org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
boolean assignReservedContainer(FSSchedulerNode node) {
RMContainer rmContainer = node.getReservedContainer();
SchedulerRequestKey reservedSchedulerKey =
rmContainer.getReservedSchedulerKey();

if (!isValidReservation(node)) {
// 如果App已经不再需要这个node上的container了,那么就unreserve它
LOG.info("Releasing reservation that cannot be satisfied for " +
"application " + getApplicationAttemptId() + " on node " + node);
unreserve(reservedSchedulerKey, node);
return false;
}

// Reservation valid; try to fulfill the reservation
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to fulfill reservation for application "
+ getApplicationAttemptId() + " on node: " + node);
}

// 先确认一下大小尺寸是否能match,即reserve的container大小和node当前剩余的资源
// 请注意:这里有个很重要的假设,贯穿整个YARN,就是同一种priority申请里面的container尺寸大小都是相同的
if (Resources.fitsIn(node.getReservedContainer().getReservedResource(),
node.getUnallocatedResource())) {
// 这里进入分配reserve资源的逻辑
assignContainer(node, true);
}
return true;
}

assignContainer(node, true);这个函数在上面已经进入过,只是当时第二个参数是false,代表没有已经reserve的container,现在的第二个参数是true,代表有已经reserve的container。可以沿着上面的分析逻辑再走一遍,正好加深一下记忆 ;)