首页 | 新闻 | 新品 | 文库 | 方案 | 视频 | 下载 | 商城 | 开发板 | 数据中心 | 座谈新版 | 培训 | 工具 | 博客 | 论坛 | 百科 | GEC | 活动 | 主题月 | 电子展
返回列表 回复 发帖

Flink on Yarn模式启动流程源代码分析(3)

Flink on Yarn模式启动流程源代码分析(3)

Props resourceMasterProps = YarnFlinkResourceManager.createActorProps(
getResourceManagerClass(),//YarnFlinkResourceManager
config,
yarnConfig,
leaderRetriever,
appMasterHostname,
webMonitorURL,
taskManagerParameters,
taskManagerContext,
numInitialTaskManagers,
LOG);
ActorRef resourceMaster = actorSystem.actorOf(resourceMasterProps);//启动YarnFlinkResourceManager actor

接着看下YarnFlinkResourceManager 的构造方法,这里主要有三个成员变量比较重要复制代码

//在yarn 的rm端会调用该回对象的回调函数进行container申请,resourceManagerCallbackHandler里面只有该actor的actor ref,所以回调的过程中能够与该actor进行通信
/** Callback handler for the asynchronous resourceManagerClient /
private YarnResourceManagerCallbackHandler resourceManagerCallbackHandler;
//AM与RM通信的client,resourceManagerClient对象持有resourceManagerCallbackHandler
/* Client to communicate with the Resource Manager (YARN's master) /
private AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient;
//AM与NM的通信client
/* Client to communicate with the Node manager and launch TaskManager processes */
private NMClient nodeManagerClient;

    YarnFlinkResourceManager 启动的过程先执行preStart方法,自己没有实现则执行其父类FlinkResourceManager的preStart方法。接着调用YarnFlinkResourceManager 的initialize方法。
    > ***在initialize方法里面***
    *** resourceManagerClient.start() ---->
      AMRMClientAsyncImpl.serviceStart()--->
      CallbackHandlerThread.start()(守护线程)--->
    YarnResourceManagerCallbackHandler.onContainersAllocated(allocated)---> yarnFrameworkMaster.tell(new ContainersAllocated(containers),ActorRef.noSender())(yarnFrameworkMaster为YarnFlinkResourceManager ActorRef) -->
    YarnFlinkResourceManager .containersAllocated -->
    NMClient.startContainer(container, taskManagerLaunchContext)
    至此通知各个NM启动container。***
    ![](?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)***至此,YarnApplicationMasterRunner 重要的流程已经说完,细节东西太多,就不再说了,有时间再看,接下来看YarnTaskManager的部分***## 3. YarnTaskManager启动流程分析***接上面nodeManagerClient.startContainer(container, taskManagerLaunchContext)将通知NM去启动container,NM根据taskManagerLaunchContext的启动信息,从HDFS下载YarnTaskManager启动过程依赖的jar和配置文件
    (container_tokens  default_container_executor_session.sh  default_container_executor.sh  flink-conf.yaml  flink.jar  launch_container.sh  lib  log4j.properties  logback.xml),然后shell执行launch_container.sh,最终用java -cp启动YarnTaskManager进程,启动进程的时候首先执行YarnTaskManager run方法,TaskManager会拿到JobManager的akka地址,然后发送注册消息,JobManager收到注册消息以后,注册成功之后就发送ack确认注册信息给TaskManager,然后TaskManger根据配置以及JobManager返回过来的信息构建一些真正干活的成员变量。过程:***
    >
    YarnTaskManagerRunner.runYarnTaskManager(args, classOf[YarnTaskManager])-->
    TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, taskManager)-->
    TaskManager.runTaskManager -->
    TaskManager.startTaskManagerComponentsAndActor-->
    actorSystem.actorOf(tmProps, actorName)-->
    TaskManager.preStart-->
    StandaloneLeaderRetrievalService.start(TaskManager)-->
    TaskManger.notifyLeaderAddress-->
    TaskManager.handleJobManagerLeaderAddress-->
    TaskManager.triggerTaskManagerRegistration()
    TaskManager.handleRegistrationMessage-->
    instanceManager.registerTaskManager-->
    jobManager 发送消息AcknowledgeRegistration给TaskManager
    TaskManager.associateWithJobManager-->
     
     
     
    ![](?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![Paste_Image.png](?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![Paste_Image.png](?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![Paste_Image.png](?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![Paste_Image.png](?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![associateWithJobManager](?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![]()###基本上Flink on yarn的流程就是这样,细节需要深入,有不正确的地方,希望给予指正。复制代码
返回列表