Flink on YARN源码分析(基于1.4版本)

Flink on YARN源码分析(基于1.4版本)

liyakun.hit
2017-12-14

1. 从命令行开始

在Flink上面提交任务需要使用下面的命令:
flink_deploy/deploy/flink-dev/bin/flink run -d -c com.bytedance.data.Kafka2ElasticSearch -m yarn-cluster -yn 2 -ytm 1024 -yjm 2048 -ys 8 -yqu root.flink_cluster.online ~/wordcount/target/wordcount-1.0-SNAPSHOT.jar
先看一下flink_deploy/deploy/flink-dev/bin/flink这个脚本,脚本里面前面都是准备工作,最重要的是最后一行命令:

1
exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.CliFrontend "$@"

通过这个命令,可以看到要启动的JAVA主类的入口是:org.apache.flink.client.CliFrontend,然后把参数都传递过去。

2. 前端处理

找到CliFrontend的main函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//位置:flink/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
/**
* Submits the job based on the arguments.
*/
public static void main(final String[] args) {
EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);

try {
final CliFrontend cli = new CliFrontend();
SecurityUtils.install(new SecurityUtils.SecurityConfiguration(cli.config));
int retCode = SecurityUtils.getInstalledContext()
.runSecured(new Callable<Integer>() {
@Override
public Integer call() {
return cli.parseParameters(args);
}
});
System.exit(retCode);
}
catch (Throwable t) {
LOG.error("Fatal error while running command line interface.", t);
t.printStackTrace();
System.exit(31);
}
}

这段代码主要是创建了一个CliFrontend类的实例cli,然后新启动一个线程来执行cli.parseParameters(args),下面继续跟进:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//位置:flink/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
/**
* Parses the command line arguments and starts the requested action.
*
* @param args command line arguments of the client.
* @return The return code of the program
*/
public int parseParameters(String[] args) {
...
// do action
switch (action) {
case ACTION_RUN:
return run(params);
case ACTION_LIST:
return list(params);
case ACTION_INFO:
return info(params);
case ACTION_CANCEL:
return cancel(params);
case ACTION_STOP:
return stop(params);
case ACTION_SAVEPOINT:
return savepoint(params);
...
}
}

这里最重要的处理是调用ACTION_RUN条件下面的run(params)方法,(这里同时还有其它的命令分支,比如list,info,cancel,stop等),这里沿着run命令继续跟进到run方法内部:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//位置:flink/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
protected int run(String[] args) {
...
ClusterClient client = null;
try {

client = createClient(options, program);
client.setPrintStatusDuringExecution(options.getStdoutLogging());
client.setDetached(options.getDetachedMode());
LOG.debug("Client slots is set to {}", client.getMaxSlots());

LOG.debug(options.getSavepointRestoreSettings().toString());

int userParallelism = options.getParallelism();
LOG.debug("User parallelism is set to {}", userParallelism);
if (client.getMaxSlots() != -1 && userParallelism == -1) {
logAndSysout("Using the parallelism provided by the remote cluster ("
+ client.getMaxSlots() + "). "
+ "To use another parallelism, set it at the ./bin/flink client.");
userParallelism = client.getMaxSlots();
}

return executeProgram(program, client, userParallelism);
}
...
}

这里首先是创建了一个ClusterClient类的实例client,这个createClient()方法会选择一个合适的ClusterClient的实现类,然后返回它的实例。这个函数里面会处理与YARN的交互,过程比较长,下面跟进分析到createClient里面(之后再继续分析当前方法内部的剩余源码):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
//位置:flink/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
/**
* Creates a {@link ClusterClient} object from the given command line options and other parameters.
* @param options Command line options
* @param program The program for which to create the client.
* @throws Exception
*/
protected ClusterClient createClient(
CommandLineOptions options,
PackagedProgram program) throws Exception {

// Get the custom command-line (e.g. Standalone/Yarn/Mesos)
CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(options.getCommandLine());

ClusterClient client;
try {
client = activeCommandLine.retrieveCluster(options.getCommandLine(), config);
logAndSysout("Cluster configuration: " + client.getClusterIdentifier());
} catch (UnsupportedOperationException e) {
try {
String applicationName = "Flink Application: " + program.getMainClassName();
client = activeCommandLine.createCluster(
applicationName,
options.getCommandLine(),
config,
program.getAllLibraries());
logAndSysout("Cluster started: " + client.getClusterIdentifier());
} catch (UnsupportedOperationException e2) {
throw new IllegalConfigurationException(
"The JobManager address is neither provided at the command-line, " +
"nor configured in flink-conf.yaml.");
}
}

// Avoid resolving the JobManager Gateway here to prevent blocking until we invoke the user's program.
final InetSocketAddress jobManagerAddress = client.getJobManagerAddress();
logAndSysout("Using address " + jobManagerAddress.getHostString() + ":" + jobManagerAddress.getPort() + " to connect to JobManager.");
logAndSysout("JobManager web interface address " + client.getWebInterfaceURL());
return client;
}

这里getActiveCustomCommandLine方法是根据-m参数来获得一个CustomCommandLine的实现类,这里由于之前的命令里面指定的是-m yarn-cluster,所以最终选择的是:FlinkYarnSessionCli这个类。然后使用FlinkYarnSessionCli这个类的retrieveCluster方法找一下是否已经提交过这个应用了(这在通过flink命令查询应用状态时成立),这里因为是新提交的应用,所以找不到,会抛出UnsupportedOperationException异常,然后程序接住这个异常,通过调用activeCommandLine.createCluster()方法来创建一个client对象。

3. 与Yarn交互

下面继续跟进到createCluster方法内部:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//位置:flink/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@Override
public YarnClusterClient createCluster(
String applicationName,
CommandLine cmdLine,
Configuration config,
List<URL> userJarFiles) {
Preconditions.checkNotNull(userJarFiles, "User jar files should not be null.");

AbstractYarnClusterDescriptor yarnClusterDescriptor = createDescriptor(applicationName, cmdLine);
yarnClusterDescriptor.setFlinkConfiguration(config);
yarnClusterDescriptor.setProvidedUserJarFiles(userJarFiles);

try {
return yarnClusterDescriptor.deploy();
} catch (Exception e) {
throw new RuntimeException("Error deploying the YARN cluster", e);
}

}

首先看一下createDescriptor方法,从结果上来看它创建了一个AbstractYarnClusterDescriptor的实例yarnClusterDescriptor。这里不再跟进到createDescriptor方法内部了,只是简单在这里介绍一下吧。 yarnClusterDescriptor其实是YarnClusterDescriptor的实例,其对象内部添加了如下信息:

  • taskmananger的数量
  • 本地jar路径
  • 需要传输的文件
  • 要使用的yarn的队列
  • jobmanager的占用内存数
  • taskmanager的占用内存数
  • taskmanager的slot数
  • dynamicproperties
  • detached模式
  • 应用名
  • 使用的zk的namespace
  • Flink的config
  • 用户的jar文件

最后一行调用了yarnClusterDescriptor.deploy()方法,继续跟进:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//位置:flink/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@Override
public YarnClusterClient deploy() {
try {
if (UserGroupInformation.isSecurityEnabled()) {
// note: UGI::hasKerberosCredentials inaccurately reports false
// for logins based on a keytab (fixed in Hadoop 2.6.1, see HADOOP-10786),
// so we check only in ticket cache scenario.
boolean useTicketCache = flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);

UserGroupInformation loginUser = UserGroupInformation.getCurrentUser();
if (loginUser.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.KERBEROS
&& useTicketCache && !loginUser.hasKerberosCredentials()) {
LOG.error("Hadoop security with Kerberos is enabled but the login user does not have Kerberos credentials");
throw new RuntimeException("Hadoop security with Kerberos is enabled but the login user " +
"does not have Kerberos credentials");
}
}
return deployInternal();
} catch (Exception e) {
throw new RuntimeException("Couldn't deploy Yarn cluster", e);
}
}

这里主要是进行了认证相关的工作,跳过它直接看接下来的deployInternal()函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
//位置:flink/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
/**
* This method will block until the ApplicationMaster/JobManager have been
* deployed on YARN.
*/
protected YarnClusterClient deployInternal() throws Exception {
isReadyForDeployment();
LOG.info("Using values:");
LOG.info("\tTaskManager count = {}", taskManagerCount);
LOG.info("\tJobManager memory = {}", jobManagerMemoryMb);
LOG.info("\tTaskManager memory = {}", taskManagerMemoryMb);

final YarnClient yarnClient = getYarnClient();

// ------------------ Check if the specified queue exists --------------------

try {
List<QueueInfo> queues = yarnClient.getAllQueues();
if (queues.size() > 0 && this.yarnQueue != null) { // check only if there are queues configured in yarn and for this session.
boolean queueFound = false;
for (QueueInfo queue : queues) {
if (queue.getQueueName().equals(this.yarnQueue)) {
queueFound = true;
break;
}
}
if (!queueFound) {
String queueNames = "";
for (QueueInfo queue : queues) {
queueNames += queue.getQueueName() + ", ";
}
LOG.warn("The specified queue '" + this.yarnQueue + "' does not exist. " +
"Available queues: " + queueNames);
}
} else {
LOG.debug("The YARN cluster does not have any queues configured");
}
} catch (Throwable e) {
LOG.warn("Error while getting queue information from YARN: " + e.getMessage());
if (LOG.isDebugEnabled()) {
LOG.debug("Error details", e);
}
}

// ------------------ Add dynamic properties to local flinkConfiguraton ------
Map<String, String> dynProperties = getDynamicProperties(dynamicPropertiesEncoded);
for (Map.Entry<String, String> dynProperty : dynProperties.entrySet()) {
flinkConfiguration.setString(dynProperty.getKey(), dynProperty.getValue());
}

// ------------------ Check if the YARN ClusterClient has the requested resources --------------

// the yarnMinAllocationMB specifies the smallest possible container allocation size.
// all allocations below this value are automatically set to this value.
final int yarnMinAllocationMB = conf.getInt("yarn.scheduler.minimum-allocation-mb", 0);
if (jobManagerMemoryMb < yarnMinAllocationMB || taskManagerMemoryMb < yarnMinAllocationMB) {
LOG.warn("The JobManager or TaskManager memory is below the smallest possible YARN Container size. "
+ "The value of 'yarn.scheduler.minimum-allocation-mb' is '" + yarnMinAllocationMB + "'. Please increase the memory size." +
"YARN will allocate the smaller containers but the scheduler will account for the minimum-allocation-mb, maybe not all instances " +
"you requested will start.");
}

// set the memory to minAllocationMB to do the next checks correctly
if (jobManagerMemoryMb < yarnMinAllocationMB) {
jobManagerMemoryMb = yarnMinAllocationMB;
}
if (taskManagerMemoryMb < yarnMinAllocationMB) {
taskManagerMemoryMb = yarnMinAllocationMB;
}

// Create application via yarnClient
final YarnClientApplication yarnApplication = yarnClient.createApplication();
GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();

Resource maxRes = appResponse.getMaximumResourceCapability();
final String note = "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n";
if (jobManagerMemoryMb > maxRes.getMemory()) {
failSessionDuringDeployment(yarnClient, yarnApplication);
throw new YarnDeploymentException("The cluster does not have the requested resources for the JobManager available!\n"
+ "Maximum Memory: " + maxRes.getMemory() + "MB Requested: " + jobManagerMemoryMb + "MB. " + note);
}

if (taskManagerMemoryMb > maxRes.getMemory()) {
failSessionDuringDeployment(yarnClient, yarnApplication);
throw new YarnDeploymentException("The cluster does not have the requested resources for the TaskManagers available!\n"
+ "Maximum Memory: " + maxRes.getMemory() + " Requested: " + taskManagerMemoryMb + "MB. " + note);
}

final String noteRsc = "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are " +
"connecting from the beginning because the resources are currently not available in the cluster. " +
"The allocation might take more time than usual because the Flink YARN client needs to wait until " +
"the resources become available.";
int totalMemoryRequired = jobManagerMemoryMb + taskManagerMemoryMb * taskManagerCount;
ClusterResourceDescription freeClusterMem = getCurrentFreeClusterResources(yarnClient);
if (freeClusterMem.totalFreeMemory < totalMemoryRequired) {
LOG.warn("This YARN session requires " + totalMemoryRequired + "MB of memory in the cluster. "
+ "There are currently only " + freeClusterMem.totalFreeMemory + "MB available." + noteRsc);

}
if (taskManagerMemoryMb > freeClusterMem.containerLimit) {
LOG.warn("The requested amount of memory for the TaskManagers (" + taskManagerMemoryMb + "MB) is more than "
+ "the largest possible YARN container: " + freeClusterMem.containerLimit + noteRsc);
}
if (jobManagerMemoryMb > freeClusterMem.containerLimit) {
LOG.warn("The requested amount of memory for the JobManager (" + jobManagerMemoryMb + "MB) is more than "
+ "the largest possible YARN container: " + freeClusterMem.containerLimit + noteRsc);
}

// ----------------- check if the requested containers fit into the cluster.

int[] nmFree = Arrays.copyOf(freeClusterMem.nodeManagersFree, freeClusterMem.nodeManagersFree.length);
// first, allocate the jobManager somewhere.
if (!allocateResource(nmFree, jobManagerMemoryMb)) {
LOG.warn("Unable to find a NodeManager that can fit the JobManager/Application master. " +
"The JobManager requires " + jobManagerMemoryMb + "MB. NodeManagers available: " +
Arrays.toString(freeClusterMem.nodeManagersFree) + noteRsc);
}
// allocate TaskManagers
for (int i = 0; i < taskManagerCount; i++) {
if (!allocateResource(nmFree, taskManagerMemoryMb)) {
LOG.warn("There is not enough memory available in the YARN cluster. " +
"The TaskManager(s) require " + taskManagerMemoryMb + "MB each. " +
"NodeManagers available: " + Arrays.toString(freeClusterMem.nodeManagersFree) + "\n" +
"After allocating the JobManager (" + jobManagerMemoryMb + "MB) and (" + i + "/" + taskManagerCount + ") TaskManagers, " +
"the following NodeManagers are available: " + Arrays.toString(nmFree) + noteRsc);
}
}

ApplicationReport report = startAppMaster(null, yarnClient, yarnApplication);

String host = report.getHost();
int port = report.getRpcPort();

// Correctly initialize the Flink config
flinkConfiguration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, host);
flinkConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port);

// the Flink cluster is deployed in YARN. Represent cluster
return createYarnClusterClient(this, yarnClient, report, flinkConfiguration, true);
}

这个方法是一个非常关键的步骤。ApplicationMaster/JobManager会在这里启动。下面仔细分析一下:

  • 首先是根据一些配置详情判定一下是否可以部署。
  • 然后通过getYarnClient方法,产生一个yarnClient对象,这个对象负责跟YARN的ResourceManager进行交互。
  • 然后获取一下声明的队列是否存在
  • 添加一些动态属性到flink的配置中
  • 判断一下Yarn配置里面的container的最大最小内存能否满足本次申请
  • 调用yarnClient.createApplication()方法来向RM申请创建一个YarnClientApplication的实例
  • 通过appResponse里面最大能提供的资源与需求进行对比,判定能否满足需求
  • 接下就是调用startAppMaster方法来启动真正的appmaster了,这个方法稍后跟进,先继续往下看。
  • 在appmaster启动后,把返回结果封闭进一个YarnClusterClient类的实例中返回回去。这里也就是返回到flink/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java的createClient()方法。

下面跟进到startAppMaster方法内部,这个方法非常重要,也非常长。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
//位置:flink/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
public ApplicationReport startAppMaster(JobGraph jobGraph, YarnClient yarnClient, YarnClientApplication yarnApplication) throws Exception {

// ------------------ Set default file system scheme -------------------------

try {
org.apache.flink.core.fs.FileSystem.setDefaultScheme(flinkConfiguration);
} catch (IOException e) {
throw new IOException("Error while setting the default " +
"filesystem scheme from configuration.", e);
}

// initialize file system
// Copy the application master jar to the filesystem
// Create a local resource to point to the destination jar path
final FileSystem fs = FileSystem.get(conf);

// hard coded check for the GoogleHDFS client because its not overriding the getScheme() method.
if (!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem") &&
fs.getScheme().startsWith("file")) {
LOG.warn("The file system scheme is '" + fs.getScheme() + "'. This indicates that the "
+ "specified Hadoop configuration path is wrong and the system is using the default Hadoop configuration values."
+ "The Flink YARN client needs to store its files in a distributed file system");
}

ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
Set<File> systemShipFiles = new HashSet<>(shipFiles.size());
for (File file : shipFiles) {
systemShipFiles.add(file.getAbsoluteFile());
}

//check if there is a logback or log4j file
File logbackFile = new File(configurationDirectory + File.separator + CONFIG_FILE_LOGBACK_NAME);
final boolean hasLogback = logbackFile.exists();
if (hasLogback) {
systemShipFiles.add(logbackFile);
}

File log4jFile = new File(configurationDirectory + File.separator + CONFIG_FILE_LOG4J_NAME);
final boolean hasLog4j = log4jFile.exists();
if (hasLog4j) {
systemShipFiles.add(log4jFile);
if (hasLogback) {
// this means there is already a logback configuration file --> fail
LOG.warn("The configuration directory ('" + configurationDirectory + "') contains both LOG4J and " +
"Logback configuration files. Please delete or rename one of them.");
}
}

addLibFolderToShipFiles(systemShipFiles);

// Set-up ApplicationSubmissionContext for the application

final ApplicationId appId = appContext.getApplicationId();

// ------------------ Add Zookeeper namespace to local flinkConfiguraton ------
String zkNamespace = getZookeeperNamespace();
// no user specified cli argument for namespace?
if (zkNamespace == null || zkNamespace.isEmpty()) {
// namespace defined in config? else use applicationId as default.
zkNamespace = flinkConfiguration.getString(HighAvailabilityOptions.HA_CLUSTER_ID, String.valueOf(appId));
setZookeeperNamespace(zkNamespace);
}

flinkConfiguration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace);

if (HighAvailabilityMode.isHighAvailabilityModeActivated(flinkConfiguration)) {
// activate re-execution of failed applications
appContext.setMaxAppAttempts(
flinkConfiguration.getInteger(
ConfigConstants.YARN_APPLICATION_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));

activateHighAvailabilitySupport(appContext);
} else {
// set number of application retries to 1 in the default case
appContext.setMaxAppAttempts(
flinkConfiguration.getInteger(
ConfigConstants.YARN_APPLICATION_ATTEMPTS,
1));
}

// local resource map for Yarn
final Map<String, LocalResource> localResources = new HashMap<>(2 + systemShipFiles.size() + userJarFiles.size());
// list of remote paths (after upload)
final List<Path> paths = new ArrayList<>(2 + systemShipFiles.size() + userJarFiles.size());
// ship list that enables reuse of resources for task manager containers
StringBuilder envShipFileList = new StringBuilder();

// upload and register ship files
List<String> systemClassPaths = uploadAndRegisterFiles(systemShipFiles, fs, appId.toString(), paths, localResources, envShipFileList);

List<String> userClassPaths;
if (userJarInclusion != YarnConfigOptions.UserJarInclusion.DISABLED) {
userClassPaths = uploadAndRegisterFiles(userJarFiles, fs, appId.toString(), paths, localResources, envShipFileList);
} else {
userClassPaths = Collections.emptyList();
}

if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) {
systemClassPaths.addAll(userClassPaths);
}

// normalize classpath by sorting
Collections.sort(systemClassPaths);
Collections.sort(userClassPaths);

// classpath assembler
StringBuilder classPathBuilder = new StringBuilder();
if (userJarInclusion == YarnConfigOptions.UserJarInclusion.FIRST) {
for (String userClassPath : userClassPaths) {
classPathBuilder.append(userClassPath).append(File.pathSeparator);
}
}
for (String classPath : systemClassPaths) {
classPathBuilder.append(classPath).append(File.pathSeparator);
}
if (userJarInclusion == YarnConfigOptions.UserJarInclusion.LAST) {
for (String userClassPath : userClassPaths) {
classPathBuilder.append(userClassPath).append(File.pathSeparator);
}
}

// Setup jar for ApplicationMaster
LocalResource appMasterJar = Records.newRecord(LocalResource.class);
LocalResource flinkConf = Records.newRecord(LocalResource.class);
Path remotePathJar =
Utils.setupLocalResource(fs, appId.toString(), flinkJarPath, appMasterJar, fs.getHomeDirectory());
Path remotePathConf =
Utils.setupLocalResource(fs, appId.toString(), flinkConfigurationPath, flinkConf, fs.getHomeDirectory());
localResources.put("flink.jar", appMasterJar);
localResources.put("flink-conf.yaml", flinkConf);

paths.add(remotePathJar);
classPathBuilder.append("flink.jar").append(File.pathSeparator);
paths.add(remotePathConf);
classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator);

// write job graph to tmp file and add it to local resource
// TODO: server use user main method to generate job graph
if (jobGraph != null) {
try {
File fp = File.createTempFile(appId.toString(), null);
fp.deleteOnExit();
try (FileOutputStream output = new FileOutputStream(fp);
ObjectOutputStream obOutput = new ObjectOutputStream(output);){
obOutput.writeObject(jobGraph);
}
LocalResource jobgraph = Records.newRecord(LocalResource.class);
Path remoteJobGraph =
Utils.setupLocalResource(fs, appId.toString(), new Path(fp.toURI()), jobgraph, fs.getHomeDirectory());
localResources.put("job.graph", jobgraph);
paths.add(remoteJobGraph);
classPathBuilder.append("job.graph").append(File.pathSeparator);
} catch (Exception e) {
LOG.warn("Add job graph to local resource fail");
throw e;
}
}

Path yarnFilesDir = new Path(fs.getHomeDirectory(), ".flink/" + appId + '/');

FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
fs.setPermission(yarnFilesDir, permission); // set permission for path.

//To support Yarn Secure Integration Test Scenario
//In Integration test setup, the Yarn containers created by YarnMiniCluster does not have the Yarn site XML
//and KRB5 configuration files. We are adding these files as container local resources for the container
//applications (JM/TMs) to have proper secure cluster setup
Path remoteKrb5Path = null;
Path remoteYarnSiteXmlPath = null;
boolean hasKrb5 = false;
if (System.getenv("IN_TESTS") != null) {
String krb5Config = System.getProperty("java.security.krb5.conf");
if (krb5Config != null && krb5Config.length() != 0) {
File krb5 = new File(krb5Config);
LOG.info("Adding KRB5 configuration {} to the AM container local resource bucket", krb5.getAbsolutePath());
LocalResource krb5ConfResource = Records.newRecord(LocalResource.class);
Path krb5ConfPath = new Path(krb5.getAbsolutePath());
remoteKrb5Path = Utils.setupLocalResource(fs, appId.toString(), krb5ConfPath, krb5ConfResource, fs.getHomeDirectory());
localResources.put(Utils.KRB5_FILE_NAME, krb5ConfResource);

File f = new File(System.getenv("YARN_CONF_DIR"), Utils.YARN_SITE_FILE_NAME);
LOG.info("Adding Yarn configuration {} to the AM container local resource bucket", f.getAbsolutePath());
LocalResource yarnConfResource = Records.newRecord(LocalResource.class);
Path yarnSitePath = new Path(f.getAbsolutePath());
remoteYarnSiteXmlPath = Utils.setupLocalResource(fs, appId.toString(), yarnSitePath, yarnConfResource, fs.getHomeDirectory());
localResources.put(Utils.YARN_SITE_FILE_NAME, yarnConfResource);

hasKrb5 = true;
}
}

// setup security tokens
LocalResource keytabResource = null;
Path remotePathKeytab = null;
String keytab = flinkConfiguration.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
if (keytab != null) {
LOG.info("Adding keytab {} to the AM container local resource bucket", keytab);
keytabResource = Records.newRecord(LocalResource.class);
Path keytabPath = new Path(keytab);
remotePathKeytab = Utils.setupLocalResource(fs, appId.toString(), keytabPath, keytabResource, fs.getHomeDirectory());
localResources.put(Utils.KEYTAB_FILE_NAME, keytabResource);
}

final ContainerLaunchContext amContainer = setupApplicationMasterContainer(hasLogback, hasLog4j, hasKrb5);

if (UserGroupInformation.isSecurityEnabled() && keytab == null) {
//set tokens only when keytab is not provided
LOG.info("Adding delegation token to the AM container..");
Utils.setTokensFor(amContainer, paths, conf);
}

amContainer.setLocalResources(localResources);
fs.close();

// Setup CLASSPATH and environment variables for ApplicationMaster
final Map<String, String> appMasterEnv = new HashMap<>();
// set user specified app master environment variables
appMasterEnv.putAll(Utils.getEnvironmentVariables(ConfigConstants.YARN_APPLICATION_MASTER_ENV_PREFIX, flinkConfiguration));
// set Flink app class path
appMasterEnv.put(YarnConfigKeys.ENV_FLINK_CLASSPATH, classPathBuilder.toString());

// set Flink on YARN internal configuration values
appMasterEnv.put(YarnConfigKeys.ENV_TM_COUNT, String.valueOf(taskManagerCount));
appMasterEnv.put(YarnConfigKeys.ENV_TM_MEMORY, String.valueOf(taskManagerMemoryMb));
appMasterEnv.put(YarnConfigKeys.FLINK_JAR_PATH, remotePathJar.toString());
appMasterEnv.put(YarnConfigKeys.ENV_APP_ID, appId.toString());
appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, fs.getHomeDirectory().toString());
appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES, envShipFileList.toString());
appMasterEnv.put(YarnConfigKeys.ENV_SLOTS, String.valueOf(slots));
appMasterEnv.put(YarnConfigKeys.ENV_DETACHED, String.valueOf(detached));
appMasterEnv.put(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE, getZookeeperNamespace());
appMasterEnv.put(YarnConfigKeys.FLINK_YARN_FILES, yarnFilesDir.toUri().toString());

// https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#identity-on-an-insecure-cluster-hadoop_user_name
appMasterEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, UserGroupInformation.getCurrentUser().getUserName());

if (keytabResource != null) {
appMasterEnv.put(YarnConfigKeys.KEYTAB_PATH, remotePathKeytab.toString());
String principal = flinkConfiguration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, principal);
}

//To support Yarn Secure Integration Test Scenario
if (remoteYarnSiteXmlPath != null && remoteKrb5Path != null) {
appMasterEnv.put(YarnConfigKeys.ENV_YARN_SITE_XML_PATH, remoteYarnSiteXmlPath.toString());
appMasterEnv.put(YarnConfigKeys.ENV_KRB5_PATH, remoteKrb5Path.toString());
}

if (dynamicPropertiesEncoded != null) {
appMasterEnv.put(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES, dynamicPropertiesEncoded);
}

// set classpath from YARN configuration
Utils.setupYarnClassPath(conf, appMasterEnv);

amContainer.setEnvironment(appMasterEnv);

// Set up resource type requirements for ApplicationMaster
Resource capability = Records.newRecord(Resource.class);
capability.setMemory(jobManagerMemoryMb);
capability.setVirtualCores(1);

String name;
if (customName == null) {
name = "Flink session with " + taskManagerCount + " TaskManagers";
if (detached) {
name += " (detached)";
}
} else {
name = customName;
}

appContext.setApplicationName(name);
appContext.setApplicationType("Apache Flink");
appContext.setAMContainerSpec(amContainer);
appContext.setResource(capability);
if (yarnQueue != null) {
appContext.setQueue(yarnQueue);
}

setApplicationTags(appContext);

// add a hook to clean up in case deployment fails
Thread deploymentFailureHook = new DeploymentFailureHook(yarnClient, yarnApplication, yarnFilesDir);
Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
LOG.info("Submitting application master " + appId);
yarnClient.submitApplication(appContext);

LOG.info("Waiting for the cluster to be allocated");
final long startTime = System.currentTimeMillis();
ApplicationReport report;
YarnApplicationState lastAppState = YarnApplicationState.NEW;
loop: while (true) {
try {
report = yarnClient.getApplicationReport(appId);
} catch (IOException e) {
throw new YarnDeploymentException("Failed to deploy the cluster.", e);
}
YarnApplicationState appState = report.getYarnApplicationState();
LOG.debug("Application State: {}", appState);
switch(appState) {
case FAILED:
case FINISHED: //TODO: the finished state may be valid in flip-6
case KILLED:
throw new YarnDeploymentException("The YARN application unexpectedly switched to state "
+ appState + " during deployment. \n" +
"Diagnostics from YARN: " + report.getDiagnostics() + "\n" +
"If log aggregation is enabled on your cluster, use this command to further investigate the issue:\n" +
"yarn logs -applicationId " + appId);
//break ..
case RUNNING:
LOG.info("YARN application has been deployed successfully.");
break loop;
default:
if (appState != lastAppState) {
LOG.info("Deploying cluster, current state " + appState);
}
if (System.currentTimeMillis() - startTime > 60000) {
LOG.info("Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster");
}

}
lastAppState = appState;
Thread.sleep(250);
}
// print the application id for user to cancel themselves.
if (isDetachedMode()) {
LOG.info("The Flink YARN client has been started in detached mode. In order to stop " +
"Flink on YARN, use the following command or a YARN web interface to stop " +
"it:\nyarn application -kill " + appId + "\nPlease also note that the " +
"temporary files of the YARN session in the home directoy will not be removed.");
}
// since deployment was successful, remove the hook
try {
Runtime.getRuntime().removeShutdownHook(deploymentFailureHook);
} catch (IllegalStateException e) {
// we're already in the shut down hook.
}
return report;
}

这个函数很长,下面依次分析一下它都做了什么:

  • 初始化要使用的文件系统
  • 获得yarnApplication的ApplicationSubmissionContext类的实例appContext
  • 添加需要传输的文件,附加上log的配置文件、lib目录
  • 通过zkNamespace来配置高可用
  • 整理classpath
  • 把jar,配置文件,jobGraph,认证信息放入到localResources中
  • 通过setupApplicationMasterContainer函数来创建am启动需要的ContainerLaunchContext类的amContainer实例。这个函数里面指定了am启动时的很多重要参数,比如主类:YarnApplicationMasterRunner等。
  • 设置am启动需要的Env,包括:
    • _CLIENT_TM_MEMORY
    • _CLIENT_TM_COUNT
    • _APP_ID
    • _CLIENT_HOME_DIR
    • _CLIENT_SHIP_FILES
    • _SLOTS
    • _DETACHED
    • _DYNAMIC_PROPERTIES
    • _FLINK_CLASSPATH
    • _FLINK_JAR_PATH
    • _FLINK_YARN_FILES
    • _KEYTAB_PATH
    • _KEYTAB_PRINCIPAL
    • HADOOP_USER_NAME
    • _ZOOKEEPER_NAMESPACE
    • _KRB5_PATH
    • _YARN_SITE_XML_PATH
  • 设置appContext的appname,type,amcontainer信息,am资源,队列,ApplicationTags
  • 设置一个失败处理的回调函数(主要是清理现场)deploymentFailureHook
  • 调用yarnClient.submitApplication(appContext)提交应用。这里就是正式的把am提交给yarn去执行了
  • 循环等待来获取刚刚提交的am任务的状态,直到取得RUNNING
  • 删除掉之前注册的失败处理的回调函数
  • 返回通过yarnClient.getApplicationReport(appId)获得的report

4. YarnApplicationMasterRunner

经过上面的流程,会在YARN中启动一个container,用来运行appmaster,这个appmaster启动之后,会跟RM和NM交互申请资源,并且管理Flinkjob的运行。下面进入AppMaster的源码分析,首先是它的main函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//位置:flink/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
/**
* The entry point for the YARN application master.
*
* @param args The command line arguments.
*/
public static void main(String[] args) {
EnvironmentInformation.logEnvironmentInfo(LOG, "YARN ApplicationMaster / ResourceManager / JobManager", args);
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);

// run and exit with the proper return code
int returnCode = new YarnApplicationMasterRunner().run(args);
System.exit(returnCode);
}

首先是打印一些提示日志,然后注册一个失败时的清理钩子,最后是调用了YarnApplicationMasterRunner().run(args)方法,下面继续跟进:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
//位置:flink/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
/**
* The instance entry point for the YARN application master. Obtains user group
* information and calls the main work method {@link #runApplicationMaster(Configuration)} as a
* privileged action.
*
* @param args The command line arguments.
* @return The process exit code.
*/
protected int run(String[] args) {
try {
LOG.debug("All environment variables: {}", ENV);

final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
require(yarnClientUsername != null, "YARN client user name environment variable {} not set",
YarnConfigKeys.ENV_HADOOP_USER_NAME);

final String currDir = ENV.get(Environment.PWD.key());
require(currDir != null, "Current working directory variable (%s) not set", Environment.PWD.key());
LOG.debug("Current working Directory: {}", currDir);

final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH);
LOG.debug("remoteKeytabPath obtained {}", remoteKeytabPath);

final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
LOG.info("remoteKeytabPrincipal obtained {}", remoteKeytabPrincipal);

String keytabPath = null;
if (remoteKeytabPath != null) {
File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
keytabPath = f.getAbsolutePath();
LOG.debug("keytabPath: {}", keytabPath);
}

UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();

LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}",
currentUser.getShortUserName(), yarnClientUsername);

// Flink configuration
final Map<String, String> dynamicProperties =
FlinkYarnSessionCli.getDynamicProperties(ENV.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES));
LOG.debug("YARN dynamic properties: {}", dynamicProperties);

final Configuration flinkConfig = createConfiguration(currDir, dynamicProperties);

// set keytab principal and replace path with the local path of the shipped keytab file in NodeManager
if (keytabPath != null && remoteKeytabPrincipal != null) {
flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath);
flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
}

org.apache.hadoop.conf.Configuration hadoopConfiguration = null;

//To support Yarn Secure Integration Test Scenario
File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME);
if (krb5Conf.exists() && krb5Conf.canRead()) {
String krb5Path = krb5Conf.getAbsolutePath();
LOG.info("KRB5 Conf: {}", krb5Path);
hadoopConfiguration = new org.apache.hadoop.conf.Configuration();
hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
}

SecurityUtils.SecurityConfiguration sc;
if (hadoopConfiguration != null) {
sc = new SecurityUtils.SecurityConfiguration(flinkConfig, hadoopConfiguration);
} else {
sc = new SecurityUtils.SecurityConfiguration(flinkConfig);
}

SecurityUtils.install(sc);

return SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>() {
@Override
public Integer call() {
return runApplicationMaster(flinkConfig);
}
});

}
catch (Throwable t) {
// make sure that everything whatever ends up in the log
LOG.error("YARN Application Master initialization failed", t);
return INIT_ERROR_EXIT_CODE;
}
}

这个函数主要是在做参数检测,最后通过runSecured方法,调用启动了一个线程,用来执行runApplicationMaster(flinkConfig),这个方法是appmaster所有逻辑中最核心的部分,源码也比较长,下面继续跟进:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
//位置:flink/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
/**
* The main work method, must run as a privileged action.
*
* @return The return code for the Java process.
*/
protected int runApplicationMaster(Configuration config) {
ActorSystem actorSystem = null;
WebMonitor webMonitor = null;
HighAvailabilityServices highAvailabilityServices = null;

int numberProcessors = Hardware.getNumberCPUCores();

final ScheduledExecutorService futureExecutor = Executors.newScheduledThreadPool(
numberProcessors,
new ExecutorThreadFactory("yarn-jobmanager-future"));

final ExecutorService ioExecutor = Executors.newFixedThreadPool(
numberProcessors,
new ExecutorThreadFactory("yarn-jobmanager-io"));

try {
// ------- (1) load and parse / validate all configurations -------

// loading all config values here has the advantage that the program fails fast, if any
// configuration problem occurs

final String currDir = ENV.get(Environment.PWD.key());
require(currDir != null, "Current working directory variable (%s) not set", Environment.PWD.key());

// Note that we use the "appMasterHostname" given by YARN here, to make sure
// we use the hostnames given by YARN consistently throughout akka.
// for akka "localhost" and "localhost.localdomain" are different actors.
final String appMasterHostname = ENV.get(Environment.NM_HOST.key());
require(appMasterHostname != null,
"ApplicationMaster hostname variable %s not set", Environment.NM_HOST.key());

LOG.info("YARN assigned hostname for application master: {}", appMasterHostname);

//Update keytab and principal path to reflect YARN container path location
final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH);

final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);

String keytabPath = null;
if (remoteKeytabPath != null) {
File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
keytabPath = f.getAbsolutePath();
LOG.info("keytabPath: {}", keytabPath);
}
if (keytabPath != null && remoteKeytabPrincipal != null) {
config.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath);
config.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
}

// Hadoop/Yarn configuration (loads config data automatically from classpath files)
final YarnConfiguration yarnConfig = new YarnConfiguration();

final int taskManagerContainerMemory;
final int numInitialTaskManagers;
final int slotsPerTaskManager;

try {
taskManagerContainerMemory = Integer.parseInt(ENV.get(YarnConfigKeys.ENV_TM_MEMORY));
} catch (NumberFormatException e) {
throw new RuntimeException("Invalid value for " + YarnConfigKeys.ENV_TM_MEMORY + " : "
+ e.getMessage());
}
try {
numInitialTaskManagers = Integer.parseInt(ENV.get(YarnConfigKeys.ENV_TM_COUNT));
} catch (NumberFormatException e) {
throw new RuntimeException("Invalid value for " + YarnConfigKeys.ENV_TM_COUNT + " : "
+ e.getMessage());
}
try {
slotsPerTaskManager = Integer.parseInt(ENV.get(YarnConfigKeys.ENV_SLOTS));
} catch (NumberFormatException e) {
throw new RuntimeException("Invalid value for " + YarnConfigKeys.ENV_SLOTS + " : "
+ e.getMessage());
}

final ContaineredTaskManagerParameters taskManagerParameters =
ContaineredTaskManagerParameters.create(config, taskManagerContainerMemory, slotsPerTaskManager);

LOG.info("TaskManagers will be created with {} task slots", taskManagerParameters.numSlots());
LOG.info("TaskManagers will be started with container size {} MB, JVM heap size {} MB, " +
"JVM direct memory limit {} MB",
taskManagerParameters.taskManagerTotalMemoryMB(),
taskManagerParameters.taskManagerHeapSizeMB(),
taskManagerParameters.taskManagerDirectMemoryLimitMB());

// ----------------- (2) start the actor system -------------------

// try to start the actor system, JobManager and JobManager actor system
// using the port range definition from the config.
final String amPortRange = config.getString(
ConfigConstants.YARN_APPLICATION_MASTER_PORT,
ConfigConstants.DEFAULT_YARN_JOB_MANAGER_PORT);

actorSystem = BootstrapTools.startActorSystem(config, appMasterHostname, amPortRange, LOG);

final String akkaHostname = AkkaUtils.getAddress(actorSystem).host().get();
final int akkaPort = (Integer) AkkaUtils.getAddress(actorSystem).port().get();

LOG.info("Actor system bound to hostname {}.", akkaHostname);

// ---- (3) Generate the configuration for the TaskManagers

final Configuration taskManagerConfig = BootstrapTools.generateTaskManagerConfiguration(
config, akkaHostname, akkaPort, slotsPerTaskManager, TASKMANAGER_REGISTRATION_TIMEOUT);
LOG.debug("TaskManager configuration: {}", taskManagerConfig);

final ContainerLaunchContext taskManagerContext = Utils.createTaskExecutorContext(
config, yarnConfig, ENV,
taskManagerParameters, taskManagerConfig,
currDir, getTaskManagerClass(), LOG);

// ---- (4) start the actors and components in this order:

// 1) JobManager & Archive (in non-HA case, the leader service takes this)
// 2) Web Monitor (we need its port to register)
// 3) Resource Master for YARN
// 4) Process reapers for the JobManager and Resource Master

// 0: Start the JobManager services

// update the configuration used to create the high availability services
config.setString(JobManagerOptions.ADDRESS, akkaHostname);
config.setInteger(JobManagerOptions.PORT, akkaPort);

highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
config,
ioExecutor,
HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);

// 1: the JobManager
LOG.debug("Starting JobManager actor");

// we start the JobManager with its standard name
ActorRef jobManager = JobManager.startJobManagerActors(
config,
actorSystem,
futureExecutor,
ioExecutor,
highAvailabilityServices,
new Some<>(JobMaster.JOB_MANAGER_NAME),
Option.<String>empty(),
getJobManagerClass(),
getArchivistClass())._1();

// 2: the web monitor
LOG.debug("Starting Web Frontend");

webMonitor = BootstrapTools.startWebMonitorIfConfigured(
config,
highAvailabilityServices,
actorSystem,
jobManager,
LOG);

String protocol = "http://";
if (config.getBoolean(JobManagerOptions.WEB_SSL_ENABLED) && SSLUtils.getSSLEnabled(config)) {
protocol = "https://";
}
final String webMonitorURL = webMonitor == null ? null :
protocol + appMasterHostname + ":" + webMonitor.getServerPort();

// 3: Flink's Yarn ResourceManager
LOG.debug("Starting YARN Flink Resource Manager");

Props resourceMasterProps = YarnFlinkResourceManager.createActorProps(
getResourceManagerClass(),
config,
yarnConfig,
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
appMasterHostname,
webMonitorURL,
taskManagerParameters,
taskManagerContext,
numInitialTaskManagers,
LOG);

ActorRef resourceMaster = actorSystem.actorOf(resourceMasterProps);

// 4: Process reapers
// The process reapers ensure that upon unexpected actor death, the process exits
// and does not stay lingering around unresponsive

LOG.debug("Starting process reapers for JobManager and YARN Application Master");

actorSystem.actorOf(
Props.create(ProcessReaper.class, resourceMaster, LOG, ACTOR_DIED_EXIT_CODE),
"YARN_Resource_Master_Process_Reaper");

actorSystem.actorOf(
Props.create(ProcessReaper.class, jobManager, LOG, ACTOR_DIED_EXIT_CODE),
"JobManager_Process_Reaper");
}
catch (Throwable t) {
// make sure that everything whatever ends up in the log
LOG.error("YARN Application Master initialization failed", t);

if (webMonitor != null) {
try {
webMonitor.stop();
} catch (Throwable ignored) {
LOG.warn("Failed to stop the web frontend", t);
}
}

if (actorSystem != null) {
try {
actorSystem.shutdown();
} catch (Throwable tt) {
LOG.error("Error shutting down actor system", tt);
}
}

futureExecutor.shutdownNow();
ioExecutor.shutdownNow();

return INIT_ERROR_EXIT_CODE;
}

// everything started, we can wait until all is done or the process is killed
LOG.info("YARN Application Master started");

// wait until everything is done
actorSystem.awaitTermination();

// if we get here, everything work out jolly all right, and we even exited smoothly
if (webMonitor != null) {
try {
webMonitor.stop();
} catch (Throwable t) {
LOG.error("Failed to stop the web frontend", t);
}
}

if (highAvailabilityServices != null) {
try {
highAvailabilityServices.close();
} catch (Throwable t) {
LOG.error("Failed to stop the high availability services.", t);
}
}

org.apache.flink.runtime.concurrent.Executors.gracefulShutdown(
AkkaUtils.getTimeout(config).toMillis(),
TimeUnit.MILLISECONDS,
futureExecutor,
ioExecutor);

return 0;
}

下面一一分析这段源码中的处理逻辑:

  • 首先是校验一下各项配置是否正常,包括工作目录、主机名、keytab位置、taskmanager的配置信息
  • 然后启动actor system
  • 通过调用Utils.createTaskExecutorContext方法为taskmanager生成启动信息taskManagerContext
  • 启动actors和components
    • 首先启动actor:jobmanager。它是JobManager类的实例。
    • 然后启动web服务,用来展示界面,并把服务地址告知接下来要启动的YarnFlinkResourceManager
    • 然后启动actor:resourceMaster。它是YarnFlinkResourceManager的实例。它是用来跟YARN进行交互的类。
    • 最后创建两个用来在异常发生时进行清理工作的reaper。
  • 这样Flink集群算是启动起来了,然后就是等待任务执行结束,再进行退出的逻辑

下面重点介绍一下刚刚提到的YarnFlinkResourceManager类。由于它是一个Actor对象,先从它的生命周期的prestart开始看,由于它没有prestart方法,追踪到它的父类FlinkResourceManager:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
//位置:flink/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
@Override
public void preStart() {
try {
// we start our leader retrieval service to make sure we get informed
// about JobManager leader changes
leaderRetriever.start(new LeaderRetrievalListener() {

@Override
public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
self().tell(
new NewLeaderAvailable(leaderAddress, leaderSessionID),
ActorRef.noSender());
}

@Override
public void handleError(Exception e) {
self().tell(
new FatalErrorOccurred("Leader retrieval service failed", e),
ActorRef.noSender());
}
});

// framework specific initialization
initialize();

}
catch (Throwable t) {
self().tell(
new FatalErrorOccurred("Error during startup of ResourceManager actor", t),
ActorRef.noSender());
}
}

在这个方法中,先是声明了一个回调用来监听jobmanager的Leader的变化,然后调用了initialize方法,由于YarnFlinkResourceManager类中覆盖了这个方法,因此,会调用到YarnFlinkResourceManager中,继续跟进:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
//位置:flink/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
@Override
protected void initialize() throws Exception {
LOG.info("Initializing YARN resource master");

resourceManagerCallbackHandler.initialize(self());

resourceManagerClient.init(yarnConfig);
resourceManagerClient.start();

// create the client to communicate with the node managers
nodeManagerClient.init(yarnConfig);
nodeManagerClient.start();
nodeManagerClient.cleanupRunningContainersOnStop(true);

// register with Resource Manager
LOG.info("Registering Application Master with tracking url {}", webInterfaceURL);

scala.Option<Object> portOption = AkkaUtils.getAddress(getContext().system()).port();
int actorSystemPort = portOption.isDefined() ? (int) portOption.get() : -1;

RegisterApplicationMasterResponse response = resourceManagerClient.registerApplicationMaster(
applicationMasterHostName, actorSystemPort, webInterfaceURL);

// if this application master starts as part of an ApplicationMaster/JobManager recovery,
// then some worker containers are most likely still alive and we can re-obtain them
List<Container> containersFromPreviousAttempts =
applicationMasterResponseReflector.getContainersFromPreviousAttempts(response);

if (!containersFromPreviousAttempts.isEmpty()) {
LOG.info("Retrieved {} TaskManagers from previous attempt", containersFromPreviousAttempts.size());

final long now = System.currentTimeMillis();
for (Container c : containersFromPreviousAttempts) {
YarnContainerInLaunch containerInLaunch = new YarnContainerInLaunch(c, now);
containersInLaunch.put(containerInLaunch.getResourceID(), containerInLaunch);
}

// adjust the progress indicator
updateProgress();
}
}

这里初始化了以下几个关键的成员:

  • resourceManagerCallbackHandler:用来处理YARN RM发过来的消息回调的
  • resourceManagerClient:用来向YARN RM发送请求的
  • nodeManagerClient:用来向YARN NM发送请求的

接下来,调用了resourceManagerClient.registerApplicationMaster方法来向YARN RM注册当前的AppMaster。下面继续跟进到这个方法里面(这个方法的实现在hadoop的源码中):

1
2
3
4
5
6
//位置:.m2/repository/org/apache/hadoop/hadoop-yarn-client/2.4.1/hadoop-yarn-client-2.4.1-sources.jar!/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
public RegisterApplicationMasterResponse registerApplicationMaster(String appHostName, int appHostPort, String appTrackingUrl) throws YarnException, IOException {
RegisterApplicationMasterResponse response = this.client.registerApplicationMaster(appHostName, appHostPort, appTrackingUrl);
this.heartbeatThread.start();
return response;
}

这里面首先是调用了this.client.registerApplicationMaster向RM申请注册,然后调用了heartbeatThread.start用来启动心跳服务。默认的,AM与RM的心跳中会携带要申请的container的信息。

在RM确认了可以分配的container信息后,会回调resourceManagerCallbackHandler的onContainersAllocated方法如下:

1
2
3
4
5
6
7
8
9
//位置:flink/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerCallbackHandler.java
@Override
public void onContainersAllocated(List<Container> containers) {
if (yarnFrameworkMaster != null) {
yarnFrameworkMaster.tell(
new ContainersAllocated(containers),
ActorRef.noSender());
}
}

继续跟进到YarnFlinkResourceManager中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//位置:flink/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
@Override
protected void handleMessage(Object message) {

// check for YARN specific actor messages first

if (message instanceof ContainersAllocated) {
containersAllocated(((ContainersAllocated) message).containers());

} else if (message instanceof ContainersComplete) {
containersComplete(((ContainersComplete) message).containers());

} else {
// message handled by the generic resource master code
super.handleMessage(message);
}
}

这里会调用containersAllocated方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
//位置:flink/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
private void containersAllocated(List<Container> containers) {
final int numRequired = getDesignatedWorkerPoolSize();
final int numRegistered = getNumberOfStartedTaskManagers();

for (Container container : containers) {
numPendingContainerRequests = Math.max(0, numPendingContainerRequests - 1);
LOG.info("Received new container: {} - Remaining pending container requests: {}",
container.getId(), numPendingContainerRequests);

// decide whether to return the container, or whether to start a TaskManager
if (numRegistered + containersInLaunch.size() < numRequired) {
// start a TaskManager
final YarnContainerInLaunch containerInLaunch = new YarnContainerInLaunch(container);
final ResourceID resourceID = containerInLaunch.getResourceID();
containersInLaunch.put(resourceID, containerInLaunch);

String message = "Launching TaskManager in container " + containerInLaunch
+ " on host " + container.getNodeId().getHost();
LOG.info(message);
sendInfoMessage(message);

try {
// set a special environment variable to uniquely identify this container
taskManagerLaunchContext.getEnvironment()
.put(ENV_FLINK_CONTAINER_ID, resourceID.getResourceIdString());
nodeManagerClient.startContainer(container, taskManagerLaunchContext);
}
catch (Throwable t) {
// failed to launch the container
containersInLaunch.remove(resourceID);

// return container, a new one will be requested eventually
LOG.error("Could not start TaskManager in container " + containerInLaunch, t);
containersBeingReturned.put(container.getId(), container);
resourceManagerClient.releaseAssignedContainer(container.getId());
}
} else {
// return excessive container
LOG.info("Returning excess container {}", container.getId());
containersBeingReturned.put(container.getId(), container);
resourceManagerClient.releaseAssignedContainer(container.getId());
}
}

updateProgress();

// if we are waiting for no further containers, we can go to the
// regular heartbeat interval
if (numPendingContainerRequests <= 0) {
resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis);
}

// make sure we re-check the status of workers / containers one more time at least,
// in case some containers did not come up properly
triggerCheckWorkers();
}

这个方法就是在通过nodeManagerClient来在各个nodemanager上面把由yarn返回的结果,进行真正的拉起。到了这里,taskmanager就都启动起来了

下面回到一开始的地方,找到CliFrontend的executeProgram方法,这个方法是在刚刚的流程结束之后,用来向jobmanager提交一个flink的job。

5. 提交flink job

上面的流程中已经完成了整个flink cluster的各个角色的启动,接下来,是向这个cluster提交一个job。

下面继续跟进executeProgram函数,它的主要功能是让Client通过Actor提交程序到JobManager:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
//位置:flink/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
protected int executeProgram(PackagedProgram program, ClusterClient client, int parallelism) {
logAndSysout("Starting execution of program");

JobSubmissionResult result;
try {
result = client.run(program, parallelism);
} catch (ProgramParametrizationException e) {
return handleParametrizationException(e);
} catch (ProgramMissingJobException e) {
return handleMissingJobException();
} catch (ProgramInvocationException e) {
return handleError(e);
} finally {
program.deleteExtractedLibraries();
}

if (null == result) {
logAndSysout("No JobSubmissionResult returned, please make sure you called " +
"ExecutionEnvironment.execute()");
return 1;
}

if (result.isJobExecutionResult()) {
logAndSysout("Program execution finished");
JobExecutionResult execResult = result.getJobExecutionResult();
System.out.println("Job with JobID " + execResult.getJobID() + " has finished.");
System.out.println("Job Runtime: " + execResult.getNetRuntime() + " ms");
Map<String, Object> accumulatorsResult = execResult.getAllAccumulatorResults();
if (accumulatorsResult.size() > 0) {
System.out.println("Accumulator Results: ");
System.out.println(AccumulatorHelper.getResultsFormatted(accumulatorsResult));
}
} else {
logAndSysout("Job has been submitted with JobID " + result.getJobID());
}

return 0;
}

这个函数中最重要的是调用了result = client.run(program, parallelism),剩下的都是一些收集的工作,这里的client是YarnClusterClient,但是由于没有覆盖run(program, parallelism)这个方法,下面继续跟进到基类中的client.run(program, parallelism):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
//位置:flink/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
public JobSubmissionResult run(PackagedProgram prog, int parallelism)
throws ProgramInvocationException, ProgramMissingJobException {
Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
if (prog.isUsingProgramEntryPoint()) {

final JobWithJars jobWithJars;
if (hasUserJarsInClassPath(prog.getAllLibraries())) {
jobWithJars = prog.getPlanWithoutJars();
} else {
jobWithJars = prog.getPlanWithJars();
}

return run(jobWithJars, parallelism, prog.getSavepointSettings());
}
else if (prog.isUsingInteractiveMode()) {
log.info("Starting program in interactive mode");

final List<URL> libraries;
if (hasUserJarsInClassPath(prog.getAllLibraries())) {
libraries = Collections.emptyList();
} else {
libraries = prog.getAllLibraries();
}

ContextEnvironmentFactory factory = new ContextEnvironmentFactory(this, libraries,
prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, isDetached(),
prog.getSavepointSettings());
ContextEnvironment.setAsContext(factory);

try {
// invoke main method
prog.invokeInteractiveModeForExecution();
if (lastJobExecutionResult == null && factory.getLastEnvCreated() == null) {
throw new ProgramMissingJobException();
}
if (isDetached()) {
// in detached mode, we execute the whole user code to extract the Flink job, afterwards we run it here
return ((DetachedEnvironment) factory.getLastEnvCreated()).finalizeExecute();
}
else {
// in blocking mode, we execute all Flink jobs contained in the user code and then return here
return this.lastJobExecutionResult;
}
}
finally {
ContextEnvironment.unsetContext();
}
}
else {
throw new ProgramInvocationException("PackagedProgram does not have a valid invocation mode.");
}
}

这里首先是判断有没有提供入口类,然后选择模式,如果提供了入口类,就是执行jar的模式,如果没有提供,那就是交互式的编程模式。根据前面的命令,这里选择有入口类的分支。然后继续跟进到run(jobWithJars, parallelism, prog.getSavepointSettings())方法内部:

1
2
3
4
5
6
7
8
9
10
11
//位置:flink/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
public JobSubmissionResult run(JobWithJars jobWithJars, int parallelism, SavepointRestoreSettings savepointSettings)
throws CompilerException, ProgramInvocationException {
ClassLoader classLoader = jobWithJars.getUserCodeClassLoader();
if (classLoader == null) {
throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader.");
}

OptimizedPlan optPlan = getOptimizedPlan(compiler, jobWithJars, parallelism);
return run(optPlan, jobWithJars.getJarFiles(), jobWithJars.getClasspaths(), classLoader, savepointSettings);
}

这里产生了一个优化过的plan(先跳过plan的部分,专注在flink on yarn的流程中),然后继续调用了run方法,继续跟进:

1
2
3
4
5
6
7
//位置:flink/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
public JobSubmissionResult run(FlinkPlan compiledPlan,
List<URL> libraries, List<URL> classpaths, ClassLoader classLoader, SavepointRestoreSettings savepointSettings)
throws ProgramInvocationException {
JobGraph job = getJobGraph(compiledPlan, libraries, classpaths, savepointSettings);
return submitJob(job, classLoader);
}

这里产生了一个JobGraph,然后提交job,继续跟进到父类YarnClusterClient中:

1
2
3
4
5
6
7
8
9
10
11
12
//位置:flink/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@Override
protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
if (isDetached()) {
if (newlyCreatedCluster) {
stopAfterJob(jobGraph.getJobID());
}
return super.runDetached(jobGraph, classLoader);
} else {
return super.run(jobGraph, classLoader);
}
}

这里先是判断是否是detached(根据参数-d),detach的意思是,在appmaster在yarn里面启动之后,就断开与client的连接;否则就一直保持连接。如果一直保持连接的话,client这里的命令行进程退出,会杀死yarn上运行的任务。下面按照开始时设置的参数-d进入到isDetached判断内部。

如果这个cluster是刚刚被创建的(创建新任务的情况;反之是查询历史任务的情况),就调用一下stopAfterJob,用来通知appmaster在运行这个job结束之后就退出。

接下来,继续跟进到super.runDetached方法内部:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//位置:flink/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
/**
* Submits a JobGraph detached.
* @param jobGraph The JobGraph
* @param classLoader User code class loader to deserialize the results and errors (may contain custom classes).
* @return JobSubmissionResult
* @throws ProgramInvocationException
*/
public JobSubmissionResult runDetached(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {

waitForClusterToBeReady();

final ActorGateway jobManagerGateway;
try {
jobManagerGateway = getJobManagerGateway();
} catch (Exception e) {
throw new ProgramInvocationException("Failed to retrieve the JobManager gateway.", e);
}

try {
logAndSysout("Submitting Job with JobID: " + jobGraph.getJobID() + ". Returning after job submission.");
JobClient.submitJobDetached(jobManagerGateway, flinkConfig, jobGraph, timeout, classLoader);
return new JobSubmissionResult(jobGraph.getJobID());
} catch (JobExecutionException e) {
throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e);
}
}

waitForClusterToBeReady()是在等待所有taskmanager都启动起来。然后找到jobManager的gateway,通过JobClient.submitJobDetached方法提交一个jobGraph。这里已经是Flink的内部逻辑了,以后再继续分析。


后记:在registerApplicationMaster的地方浪费了太多时间,找不到向yarn rm发出container资源申请的点,后面发现原来是跟着心跳一起的。