本篇继续对 RM 中管理 NodeManager 的部分进行深入的讲解。主要有三个部分:检查 NM 是否存活;管理 NM 的黑白名单;响应 NM RPC 请求。
一、简介
在 RM 的主从结构中,最主要的就是 RM 和 NM 之间的主从结构。RM 作为「管理员」,管理下面多个 NM 节点。如何检测 NM 是否存活,划分黑白名单,以及如何相应 NM RPC 请求,将在下面进行详细介绍。
二、NM 管理主要组成
一)NMLivelinessMonitor
- 检测 NM 活性的服务,是否有心跳
- 当前正在运行的 NM 会保存在 RM 的一个数据结构中,NMLivelinessMonitor 就周期遍历,若一个 NM 在一定时间(默认10分钟)未汇报心跳,则任务其挂了
- NM 被认为挂了后,其上的 Container 会自动置为运行失败,并通知给 AM,由 AM 决定后续处理方案
二)NodesListManager
- 管理 exclude(类似黑名单)和 include(类似白名单)列表
- 启动时,它们分别从
yarn.resourcemanager.nodes.include-path
以及yarn.resourcemanager.nodes.exclude-path
中读取 - 黑名单列表中的nodes不能够和RM直接通信(直接抛出RPC异常)
- 可以动态加载,使用命令
yarn rmadmin -refreshNodes
三)ResourceTrackerService
ResourceTrackerService
是 RPC 协议 ResourceTracker
的一个实现,它作为一个 RPC Server 端接收 NodeManager 的 RPC 请求。
请求主要包含2种信息,1)注册NodeManager。2)处理心跳信息。
- 注册 NodeManager 处理:
ResourceTrackerService#registerNodeManager
// ResourceTrackerService#registerNodeManager public RegisterNodeManagerResponse registerNodeManager( RegisterNodeManagerRequest request) throws YarnException, IOException { // nm 节点信息 - ip、端口、资源、版本 NodeId nodeId = request.getNodeId(); String host = nodeId.getHost(); int cmPort = nodeId.getPort(); int httpPort = request.getHttpPort(); Resource capability = request.getResource(); String nodeManagerVersion = request.getNMVersion(); RegisterNodeManagerResponse response = recordFactory .newRecordInstance(RegisterNodeManagerResponse.class); // 检查版本 if (!minimumNodeManagerVersion.equals("NONE")) { if (minimumNodeManagerVersion.equals("EqualToRM")) { minimumNodeManagerVersion = YarnVersionInfo.getVersion(); } if ((nodeManagerVersion == null) || (VersionUtil.compareVersions(nodeManagerVersion,minimumNodeManagerVersion)) < 0) { String message = "Disallowed NodeManager Version " + nodeManagerVersion + ", is less than the minimum version " + minimumNodeManagerVersion + " sending SHUTDOWN signal to " + "NodeManager."; LOG.info(message); response.setDiagnosticsMessage(message); response.setNodeAction(NodeAction.SHUTDOWN); return response; } }
- 处理心跳
- 可以到源码中查看具体逻辑
ResourceTracker#nodeHeart
- 接收并检查 nm 汇报的心跳信息
- 在
NodeHeartbeatResponse
中 set 需要释放的 Container 列表、Application 列表等信息 - 向 RMNode 发送该 NodeManager 的状态信息并且保存最近一次心跳应答信息
- 返回 `NodeHeartbeatResponse
- 可以到源码中查看具体逻辑