Flink on YARN源码分析(基于1.4版本)
liyakun.hit
2017-12-14
1. 从命令行开始
在Flink上面提交任务需要使用下面的命令:
flink_deploy/deploy/flink-dev/bin/flink run -d -c com.bytedance.data.Kafka2ElasticSearch -m yarn-cluster -yn 2 -ytm 1024 -yjm 2048 -ys 8 -yqu root.flink_cluster.online ~/wordcount/target/wordcount-1.0-SNAPSHOT.jar
先看一下flink_deploy/deploy/flink-dev/bin/flink这个脚本,脚本里面前面都是准备工作,最重要的是最后一行命令:
1 | exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.CliFrontend "$@" |
通过这个命令,可以看到要启动的JAVA主类的入口是:org.apache.flink.client.CliFrontend,然后把参数都传递过去。
2. 前端处理
找到CliFrontend的main函数:
1 | //位置:flink/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java |
这段代码主要是创建了一个CliFrontend类的实例cli,然后新启动一个线程来执行cli.parseParameters(args),下面继续跟进:
1 | //位置:flink/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java |
这里最重要的处理是调用ACTION_RUN条件下面的run(params)方法,(这里同时还有其它的命令分支,比如list,info,cancel,stop等),这里沿着run命令继续跟进到run方法内部:
1 | //位置:flink/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java |
这里首先是创建了一个ClusterClient类的实例client,这个createClient()方法会选择一个合适的ClusterClient的实现类,然后返回它的实例。这个函数里面会处理与YARN的交互,过程比较长,下面跟进分析到createClient里面(之后再继续分析当前方法内部的剩余源码):
1 | //位置:flink/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java |
这里getActiveCustomCommandLine方法是根据-m参数来获得一个CustomCommandLine的实现类,这里由于之前的命令里面指定的是-m yarn-cluster,所以最终选择的是:FlinkYarnSessionCli这个类。然后使用FlinkYarnSessionCli这个类的retrieveCluster方法找一下是否已经提交过这个应用了(这在通过flink命令查询应用状态时成立),这里因为是新提交的应用,所以找不到,会抛出UnsupportedOperationException异常,然后程序接住这个异常,通过调用activeCommandLine.createCluster()方法来创建一个client对象。
3. 与Yarn交互
下面继续跟进到createCluster方法内部:
1 | //位置:flink/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java |
首先看一下createDescriptor方法,从结果上来看它创建了一个AbstractYarnClusterDescriptor的实例yarnClusterDescriptor。这里不再跟进到createDescriptor方法内部了,只是简单在这里介绍一下吧。 yarnClusterDescriptor其实是YarnClusterDescriptor的实例,其对象内部添加了如下信息:
- taskmananger的数量
- 本地jar路径
- 需要传输的文件
- 要使用的yarn的队列
- jobmanager的占用内存数
- taskmanager的占用内存数
- taskmanager的slot数
- dynamicproperties
- detached模式
- 应用名
- 使用的zk的namespace
- Flink的config
- 用户的jar文件
最后一行调用了yarnClusterDescriptor.deploy()方法,继续跟进:
1 | //位置:flink/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java |
这里主要是进行了认证相关的工作,跳过它直接看接下来的deployInternal()函数:
1 | //位置:flink/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java |
这个方法是一个非常关键的步骤。ApplicationMaster/JobManager会在这里启动。下面仔细分析一下:
- 首先是根据一些配置详情判定一下是否可以部署。
- 然后通过getYarnClient方法,产生一个yarnClient对象,这个对象负责跟YARN的ResourceManager进行交互。
- 然后获取一下声明的队列是否存在
- 添加一些动态属性到flink的配置中
- 判断一下Yarn配置里面的container的最大最小内存能否满足本次申请
- 调用yarnClient.createApplication()方法来向RM申请创建一个YarnClientApplication的实例
- 通过appResponse里面最大能提供的资源与需求进行对比,判定能否满足需求
- 接下就是调用startAppMaster方法来启动真正的appmaster了,这个方法稍后跟进,先继续往下看。
- 在appmaster启动后,把返回结果封闭进一个YarnClusterClient类的实例中返回回去。这里也就是返回到flink/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java的createClient()方法。
下面跟进到startAppMaster方法内部,这个方法非常重要,也非常长。
1 | //位置:flink/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java |
这个函数很长,下面依次分析一下它都做了什么:
- 初始化要使用的文件系统
- 获得yarnApplication的ApplicationSubmissionContext类的实例appContext
- 添加需要传输的文件,附加上log的配置文件、lib目录
- 通过zkNamespace来配置高可用
- 整理classpath
- 把jar,配置文件,jobGraph,认证信息放入到localResources中
- 通过setupApplicationMasterContainer函数来创建am启动需要的ContainerLaunchContext类的amContainer实例。这个函数里面指定了am启动时的很多重要参数,比如主类:YarnApplicationMasterRunner等。
- 设置am启动需要的Env,包括:
- _CLIENT_TM_MEMORY
- _CLIENT_TM_COUNT
- _APP_ID
- _CLIENT_HOME_DIR
- _CLIENT_SHIP_FILES
- _SLOTS
- _DETACHED
- _DYNAMIC_PROPERTIES
- _FLINK_CLASSPATH
- _FLINK_JAR_PATH
- _FLINK_YARN_FILES
- _KEYTAB_PATH
- _KEYTAB_PRINCIPAL
- HADOOP_USER_NAME
- _ZOOKEEPER_NAMESPACE
- _KRB5_PATH
- _YARN_SITE_XML_PATH
- 设置appContext的appname,type,amcontainer信息,am资源,队列,ApplicationTags
- 设置一个失败处理的回调函数(主要是清理现场)deploymentFailureHook
- 调用yarnClient.submitApplication(appContext)提交应用。这里就是正式的把am提交给yarn去执行了。
- 循环等待来获取刚刚提交的am任务的状态,直到取得RUNNING
- 删除掉之前注册的失败处理的回调函数
- 返回通过yarnClient.getApplicationReport(appId)获得的report
4. YarnApplicationMasterRunner
经过上面的流程,会在YARN中启动一个container,用来运行appmaster,这个appmaster启动之后,会跟RM和NM交互申请资源,并且管理Flinkjob的运行。下面进入AppMaster的源码分析,首先是它的main函数:
1 | //位置:flink/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java |
首先是打印一些提示日志,然后注册一个失败时的清理钩子,最后是调用了YarnApplicationMasterRunner().run(args)方法,下面继续跟进:
1 | //位置:flink/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java |
这个函数主要是在做参数检测,最后通过runSecured方法,调用启动了一个线程,用来执行runApplicationMaster(flinkConfig),这个方法是appmaster所有逻辑中最核心的部分,源码也比较长,下面继续跟进:
1 | //位置:flink/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java |
下面一一分析这段源码中的处理逻辑:
- 首先是校验一下各项配置是否正常,包括工作目录、主机名、keytab位置、taskmanager的配置信息
- 然后启动actor system
- 通过调用Utils.createTaskExecutorContext方法为taskmanager生成启动信息taskManagerContext
- 启动actors和components
- 首先启动actor:jobmanager。它是JobManager类的实例。
- 然后启动web服务,用来展示界面,并把服务地址告知接下来要启动的YarnFlinkResourceManager
- 然后启动actor:resourceMaster。它是YarnFlinkResourceManager的实例。它是用来跟YARN进行交互的类。
- 最后创建两个用来在异常发生时进行清理工作的reaper。
- 这样Flink集群算是启动起来了,然后就是等待任务执行结束,再进行退出的逻辑
下面重点介绍一下刚刚提到的YarnFlinkResourceManager类。由于它是一个Actor对象,先从它的生命周期的prestart开始看,由于它没有prestart方法,追踪到它的父类FlinkResourceManager:
1 | //位置:flink/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java |
在这个方法中,先是声明了一个回调用来监听jobmanager的Leader的变化,然后调用了initialize方法,由于YarnFlinkResourceManager类中覆盖了这个方法,因此,会调用到YarnFlinkResourceManager中,继续跟进:
1 | //位置:flink/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java |
这里初始化了以下几个关键的成员:
- resourceManagerCallbackHandler:用来处理YARN RM发过来的消息回调的
- resourceManagerClient:用来向YARN RM发送请求的
- nodeManagerClient:用来向YARN NM发送请求的
接下来,调用了resourceManagerClient.registerApplicationMaster方法来向YARN RM注册当前的AppMaster。下面继续跟进到这个方法里面(这个方法的实现在hadoop的源码中):
1 | //位置:.m2/repository/org/apache/hadoop/hadoop-yarn-client/2.4.1/hadoop-yarn-client-2.4.1-sources.jar!/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java |
这里面首先是调用了this.client.registerApplicationMaster向RM申请注册,然后调用了heartbeatThread.start用来启动心跳服务。默认的,AM与RM的心跳中会携带要申请的container的信息。
在RM确认了可以分配的container信息后,会回调resourceManagerCallbackHandler的onContainersAllocated方法如下:
1 | //位置:flink/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerCallbackHandler.java |
继续跟进到YarnFlinkResourceManager中:
1 | //位置:flink/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java |
这里会调用containersAllocated方法:
1 | //位置:flink/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java |
这个方法就是在通过nodeManagerClient来在各个nodemanager上面把由yarn返回的结果,进行真正的拉起。到了这里,taskmanager就都启动起来了
下面回到一开始的地方,找到CliFrontend的executeProgram方法,这个方法是在刚刚的流程结束之后,用来向jobmanager提交一个flink的job。
5. 提交flink job
上面的流程中已经完成了整个flink cluster的各个角色的启动,接下来,是向这个cluster提交一个job。
下面继续跟进executeProgram函数,它的主要功能是让Client通过Actor提交程序到JobManager:
1 | //位置:flink/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java |
这个函数中最重要的是调用了result = client.run(program, parallelism),剩下的都是一些收集的工作,这里的client是YarnClusterClient,但是由于没有覆盖run(program, parallelism)这个方法,下面继续跟进到基类中的client.run(program, parallelism):
1 | //位置:flink/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java |
这里首先是判断有没有提供入口类,然后选择模式,如果提供了入口类,就是执行jar的模式,如果没有提供,那就是交互式的编程模式。根据前面的命令,这里选择有入口类的分支。然后继续跟进到run(jobWithJars, parallelism, prog.getSavepointSettings())方法内部:
1 | //位置:flink/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java |
这里产生了一个优化过的plan(先跳过plan的部分,专注在flink on yarn的流程中),然后继续调用了run方法,继续跟进:
1 | //位置:flink/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java |
这里产生了一个JobGraph,然后提交job,继续跟进到父类YarnClusterClient中:
1 | //位置:flink/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java |
这里先是判断是否是detached(根据参数-d),detach的意思是,在appmaster在yarn里面启动之后,就断开与client的连接;否则就一直保持连接。如果一直保持连接的话,client这里的命令行进程退出,会杀死yarn上运行的任务。下面按照开始时设置的参数-d进入到isDetached判断内部。
如果这个cluster是刚刚被创建的(创建新任务的情况;反之是查询历史任务的情况),就调用一下stopAfterJob,用来通知appmaster在运行这个job结束之后就退出。
接下来,继续跟进到super.runDetached方法内部:
1 | //位置:flink/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java |
waitForClusterToBeReady()是在等待所有taskmanager都启动起来。然后找到jobManager的gateway,通过JobClient.submitJobDetached方法提交一个jobGraph。这里已经是Flink的内部逻辑了,以后再继续分析。
后记:在registerApplicationMaster的地方浪费了太多时间,找不到向yarn rm发出container资源申请的点,后面发现原来是跟着心跳一起的。