首页 | 新闻 | 新品 | 文库 | 方案 | 视频 | 下载 | 商城 | 开发板 | 数据中心 | 座谈新版 | 培训 | 工具 | 博客 | 论坛 | 百科 | GEC | 活动 | 主题月 | 电子展
返回列表 回复 发帖

Flink on Yarn模式启动流程源代码分析(2)

Flink on Yarn模式启动流程源代码分析(2)

接着看下是如何构造YarnClusterDescriptor的

----------------- **1 creat YarnClusterDescriptor ** ----------------------

直接new YarnClusterDescriptor对象,然后将依赖jar地址,配置参数如taskmanager个数,jar地址,配置文件地址,配置参数等设置到YarnClusterDescriptor对象中去,然后返回这个对象。

------------** 2 YarnClusterDescriptor deploy ** -------------------------

由于YarnClusterDescriptor没有重写depoy方法则直接调用其父类AbstractYarnClusterDescriptor的deploy方法,但是最终调用的是其deployInternal方法.

接着看下deployInternal方法,简单的描述下流程,后续代码分析下面的github地址

    检查是否具备Deploy的条件,如配置文件,jar路径是否为空

    获取yarn的client,用户和RM进行通信

    增加动态的配置属性到配置conf对象中去,解析配置conf对象为kv对

    获取HDFS FileSyetem,这里用于将本地jar及配置文件上传到HDFS,

    判断JobManager和TaskManager申请的资源是否满足yarn分配单个container的最小分配,如果小于则将container最小分配用来初始化jobMananger和TaskMananer

    通过yarn client创建Application,返回GetNewApplicationResponse对象用于跟RM进行RPC通信。

    通过GetNewApplicationResponse对象获取RM能够为这个应用分配的最大资源,如果最大资源不能够满足jobManagerMemoryMb和taskManagerMemoryMb则报错,计算总的jobmanager和所有taskmanager总共需要的资源(jobManagerMemoryMb + taskManagerMemoryMb * taskManagerCount),计算RM中总共的空闲资源,判断空闲资源是否满足前面计算需要的需求,如果不满足,则可能先启动yarn session,task manager等到有资源再进行启动;先为jobManager分配一个nm,然后再在其他的nm上启动taskmanager

    设置启动ApplicationMaster的 lanchcontext,这里主要是设置java home,主类,jvm参数数,log文件配置。ApplicationMaster的主类 YarnApplicationMasterRunner ** YarnApplicationMasterRunner **。

    protected Class<?> getApplicationMasterClass() {   
    return YarnApplicationMasterRunner.class;
    }```
    - 设置ApplicationSubmissionContext,获取ApplicationId
    - 设置session需要的hdfs路径,然后将本地jar包及配置文件,配置文件上传到HDFS
    - 设置AM启动的token信息,设置AM启动的过程中需要从hdfs下载那些依赖的jar和配置文件,设置ApplicationMaster及Flink及其他进程的classpath,不多说
    - 设置钩子函数在deploy的时候清理上传到hdfs的文件及本地下载的依赖文件
    - *** 重点,提交Applicaiton到RM;设置这个Application的状态为NEW,然后监控这个应用,如果不是之前的NEW状态,则打印当前状态,如果Running状态则跳出这个循环,如果是其他状态,则抛出YarnDeploymentException异常,上层调用捕获处理吧,不然250ms判断一次 ***
    - depoly成功,钩子函数删除临时文件,如依赖的jar包和配置文件等,返回YarnClusterClient对象,包含了这YarnClusterDescriptor,ApplicationReport等重要的属性。
    ***
    ***deploy 成功以后进入交互模式,在runInteractiveCli里面最重要的一步是构造ApplicationClient Actor用于和JobManager Actor进行通信,但是如果发送 RegisterInfoMessageListener、UnRegisterInfoMessageListener等消息,将会由jobmanager actor将forward方法路由到flink resource manager actor去处理,此时jobmanager作为flink resource manager的代理,此时收到这两个消息的时候,由于是forward的方法,sender仍然是application client actor,所以flink manager resource actor可以直接给application client返回消息***
    ***
    > ------------ ** 3 代码展示主要流程**------
    ![](?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)*** ---- ApplicationClient 和JobManager Actor通信代码 --***
     
    ![](?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)## 2. YarnApplicationMasterRunner 启动流程分析
    *** RM首先分配一个NM的container去启动YarnApplicationMasterRunner ,接着下来我们看下是怎么做的***
    首先是进入main函数里面,构造一个YarnApplicationMasterRunner对象,直接调用其Run方法。
    > run方法主要步骤
    - 获取当前用户的UGI及远端UGI
    - 将当前用户ugi里面的token传递到远端的UGI中,用于数据和服务访问
    - 在远端的UGI里面执行runApplicationMaster启动ApplicationMaster
    ![](?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)> runApplicationMaster主要过程,这里注释很清楚,我只捡重要的提示下
    - 1) load and parse / validate all configurations
    - 2) start the actor system,try to start the actor system, JobManager and JobManager actor system
    - 3) Generate the configuration for the TaskManagers,这里主要是JobManager的地址,taskManager注册的超时时间,slot个数,这里还有最重要的一步是构造TaskManager的ContainerLaunchContext,这个context里面包含了启动TaskManager的启动命令,***主类是YarnTaskManager***。
    -  start the actors and components in this order:  1) JobManager & Archive (in non-HA case, the leader service takes this),启动JobManagerActor,这里主类是***YarnJobManager***  2) Web Monitor (we need its port to register) 启动WEB监控页面,创建LeaderRetrievalService对象,这个主要用于启动TaskManager的时候,告诉TaskManager JobManager得akka url,用于TaskManager向JobManager进行注册。  3) Resource Master for YARN   启动YarnFlinkResourceManager Actor,这里主要用于Flink container资源的管理包括申请与释放等。  4) Process reapers for the JobManager and Resource Master
    ***这里主要介绍YarnApplicationMasterRunner 是如何通过YarnFlinkResourceManager去完成container的申请与启动TaskManager的,这里相对来说,比较复杂,我跟到Yarn的代码里才算整明白***
     
    ![YarnFlinkResourceManager的继承关系](?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)说明YarnFlinkResourceManager其实是一个actor,在runApplicationMaster方法中,通过下面的代码启动这个Actor复制代码
返回列表