上篇文章介绍了编写 Yarn Application 的整体框架流程,本篇文章将详细介绍其中 Client 部分的编写方式。
一、Yarn Client 编写方法
本篇代码已上传 Github:
Github - MyYarnClient
一)编写流程
1、创建并启动 Client
YarnClient 内容通过 ApplicationClientProtocol 与 ResourceManager 通信,向 RM 的ApplicationsManager
申请 Application。
跟踪进去可以在 YarnClientImpl
找到 rpc:
this.rmClient = (ApplicationClientProtocol)ClientRMProxy.createRMProxy(this.getConfig(), ApplicationClientProtocol.class);
YarnClient yarnClient = YarnClient.createYarnClient(); yarnClient.init(conf); yarnClient.start();
2、通过YarnClient 创建 Application
GetNewApplicationResponse
中除了包含 ApplicationId
,还包括集群最大/最小资源,给任务启动设置的资源作参考。
YarnClientApplication app = yarnClient.createApplication(); GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
3、关键:完善 ApplicationSubmissionContext
需要在 ApplicationSubmissionContext
中定义 RM 启动 AM 时所需的全部信息,主要包括:
- app 信息:id,name
- 队列、优先级信息
- 提交用户
- ContainerLaunchContext:定义 AM 启动所需信息
- RECT
- Resources (binaries, jars, files etc.):其中包括 Application master jar
- Environment settings (CLASSPATH etc.)
- Command to be executed
- security Tokens
- RECT
// 3 完善 ApplicationSubmissionContext 所需内容 ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext(); ApplicationId applicationId = appContext.getApplicationId(); // 3.1 设置application name appContext.setApplicationName("my-test-app"); // 3.2 设置ContainerLaunchContext // localResources, env, commands 等 // application master 的 jar 放到 localResources 中 // 这部分较长省略,请到代码中查看 ContainerLaunchContext amContainerCtx = createAMContainerLaunchContext( conf, app.getApplicationSubmissionContext().getApplicationId()); appContext.setAMContainerSpec(amContainerCtx); // 3.3 设置优先级 Priority pri = Priority.newInstance(0); appContext.setPriority(pri); // 3.4 设置队列 appContext.setQueue("default"); // 3.5 设置 am 资源 int amMemory = 2048; int amVCores = 2; Resource capability = Resource.newInstance(amMemory, amVCores); appContext.setResource(capability);
4、提交 Application
提交后,RM 接收到 Application,根据资源请求分配容器,最终将 AM 启动在容器中。
这里交给 YarnClientImpl
执行 rmClient.submitApplication(request)
,通过 RPC ApplicationClientProtocol
提交到 RM
ApplicationId appId = yarnClient.submitApplication(appContext);
5、获取任务进度信息
ApplicationReport report = yarnClient.getApplicationReport(appId); log.info("Got application report " + ", clientToAMToken=" + report.getClientToAMToken() + ", appDiagnostics=" + report.getDiagnostics() + ", appMasterHost=" + report.getHost() + ", appQueue=" + report.getQueue() + ", appMasterRpcPort=" + report.getRpcPort() + ", appStartTime=" + report.getStartTime() + ", yarnAppState=" + report.getYarnApplicationState().toString() + ", distributedFinalState=" + report.getFinalApplicationStatus().toString() + ", appTrackingUrl=" + report.getTrackingUrl() + ", appUser=" + report.getUser());
6、kill Application
当 Application 运行了过长的时间或者其他的原因,client 可以 kill application。
流程是:client 像 RM 发送 kill 信号,再传递给 AM
yarnClient.killApplication(appId);
二)涉及的通信协议
参考文章:
Hadoop: Writing YARN Applications - Writing a simple Client
《Hadoop 技术内幕 - 深入解析 Yarn 结构设计与实现原理》第四章