大多数时候我们使用的应用比如MapReduce,Spark都是构建在Yarn之上的。我们并不需要自己编写Yarn的应用程序。但是了解一下一个程序在Yarn里是如何运行的,在你更好的理解和使用MapReduce,Spark,以及对它们进行调优都是很有帮助的。

Client如何提交一个Yarn程序

客户程序通过YarnClient向ResourceManager提交一个Applicaiton。

具体过程如下:
1. 生成一个YarnClient实例,并启动它。


  YarnClient yarnClient = YarnClient.createYarnClient();
  yarnClient.init(conf);
  yarnClient.start();

2.创建一个Application,并获取它的ID。将来我们查询这个Applicaiton的时候需要用到ID。


  YarnClientApplication app = yarnClient.createApplication();
    ApplicationId appId = appContext.getApplicationId();

3.创建一个ContainerLaunchContext,它是用来描述如何启动Applicaiton Master的container的。它主要包括它依赖的本地资源,环境变量,以及运行的命令。


  ContainerLaunchContext am_containerLaunchContext = ContainerLaunchContext.newInstance(
  localResources, env, commands, null, null, null);

下边是一个完整的ContainerLaunchContext配置需要的参数。


 public static ContainerLaunchContext newInstance(
      Map<String, LocalResource> localResources,
      Map<String, String> environment, List<String> commands,
      Map<String, ByteBuffer> serviceData,  ByteBuffer tokens,
      Map<ApplicationAccessType, String> acls)

4.获得之前创建的Application的ApplicationSubmissionContext,并配置它


 ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
 appContext.setApplicationName(appName);
 appContext.setAMContainerSpec(am_containerLaunchContext);
 Resource capability = Resource.newInstance(amMemory, amVCores);
 appContext.setResource(capability);
 // Set the priority for the application master
 Priority pri = Priority.newInstance(amPriority);
 appContext.setPriority(pri);
 // Set the queue to which this application is to be submitted in the RM
 appContext.setQueue(amQueue);

5.提交这个Applicaiton


yarnClient.submitApplication(appContext);

6.通过ApplicationReport获取Applicaiton的进度。


ApplicationReport report = yarnClient.getApplicationReport(appId);

7.终止Applicaiton


yarnClient.killApplication(appId);

通过上边的步骤我们就可以提交一个Application 给 Yarn的ResourceManager。Yarn会根据我们的capability设置,在集群里创建一个Container,然后根据我们的设置的ContainerLaunchContext起启动我们的Applicaiton Master。接下来我们看一个Applicaiton Master需要做些什么。

一个Application Master(AM)都做些什么

AM是一个Yarn job的实际所有和管理者。通过这篇文章的前半部分我们知道它是Client通过ResourceManager(RM)提交并创建的。
AM的进程运行起来后,它会做一下事情:
1. 启动两个client,一个AMRMClient负责和RM通讯,一个NMClient负责和一组NodeManager(NM)来通讯。


  AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
  amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
  amRMClient.init(conf);
  amRMClient.start();

  containerListener = createNMCallbackHandler();
  nmClientAsync = new NMClientAsyncImpl(containerListener);
  nmClientAsync.init(conf);
  nmClientAsync.start();
  1. 向RM注册自己这个AM

appMasterHostname = NetUtils.getHostname();
RegisterApplicationMasterResponse response = amRMClient
    .registerApplicationMaster(appMasterHostname, appMasteRpcPort,
        appMasterTrackingUrl);

3.向RM请求资源来执行自己的Task


  Resource capability = Resource.newInstance(containerMemory,
    containerVirtualCores);
  ContainerRequest request = new ContainerRequest(capability, null, null, pri);
    amRMClient.addContainerRequest(containerAsk);

4.RM分配好AM需要的container后,会通过AMRMClientAsync.CallbackHandler 回调,我们需要在回调函数内给这些分配好的container配置它们的ContainerLaunchContext,然后通过nmClientAsync来提交。我们在之前client给RM提交Applicaiton时,也给AM的container准备了ContainerLaunchContext。可以启动container是由AM来控制的,如果RM分配了container给AM,但是AM长时间不使用,是会被RM收回的。


@Override
public void onContainersAllocated(List<Container> allocatedContainers) {
  ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
        localResources, shellEnv, commands, null, allTokens.duplicate(), null);
    for (Container allocatedContainer : allocatedContainers) {
        containerListener.addContainer(container.getId(), container);
        nmClientAsync.startContainerAsync(container, ctx);
  }
}

5.当job执行完成的时候,向RM注销自己,并关闭amRMClient


amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
amRMClient.stop();

最后我们通过一张图看一下一个Yarn程序运行时各个进程之间的交互:

发表评论

电子邮件地址不会被公开。 必填项已用*标注

%d 博主赞过: