Nodejs 使用 ZooKeeper 做服务发现

将单体服务拆分为微服务后,为了服务高可用,一般会做集群多实例。但在分布式下,怎么进行高效、便捷的进行服务访问问题,出现了各类服务注册和服务发现框架。这里使用的是Zookeeper。ZooKeeper 官网 https://zookeeper.apache.org

我们的业务系统使用的开发语言是JAVA,但是部分页面请求是先到nodejs 做的webportal服务,进行权限校验,校验通过后调用Java提供的API。当前阶段Java端已经微服务化,使用Zookeeper作为注册中心,目前只需要让nodejs端,也接入到Zookeeper,作为服务消费者,就能搭建机器环境。

找轮子

通过查找,发现npm有现成的库 node-zookeeper-client ,避免重复造轮子,就用它了。

接入思路

由于我们只是作为服务消费者,不需要使用服务注册的api,大部分可以直接在文档中找到API。

编码过程 

npm 安装

 1 npm i node-zookeeper-client 

连接ZK

 1 const Zookeeper = require('node-zookeeper-client');  2 const CONNECTION_STRING = "127.0.0.1:2181"; // ZK的服务地址  3 const OPTIONS = {  4   sessionTimeout: 5000   5 }  6 const zk = Zookeeper.createClient(CONNECTION_STRING, OPTIONS);  7   zk.on('connected', function(){  8   console.log("zk=====", zk);  9 }); 10 //获取根节点下的子节点数据 11 zk.getChildren('/', function(error, children, stat){ 12   if(error){ 13     console.log(error.stack); 14     return; 15   } 16   console.log(children); 17 }) 18 zk.connect();

其他API(仅供参考)

 1 // 判断节点是否已存在  2 zk.exists('/phpnode',function(error,stat){  3    if(stat){  4         console.log("节点存在");  5    }else{  6        console.log("节点不存在");  7    }  8 })  9   10  // 创建/注册节点 11 zk.create('/phpnode',new Buffer('hello'),function(error,path){ 12    console.log(path); 13 }) 14  15 // 获取节点数据 16 zk.getData('/phpnode',function(error,data,stat){ 17    console.log(data.toString()); 18 }); 19  20 //节点删除 21 zk.remove('/phpnode',function(error){ 22    if(!error){ 23        console.log('node 节点删除成功'); 24     } 25 })

于是有了第一版本代码

 1 const zookeeper = require('node-zookeeper-client');  2   3   4 // ZK基础配置信息,正式项目需要从环境文件导入  5 export const ZK = {  6     clientAddress: 'localhost:2181/zk/test', // ZK地址  7     servicePath: '/test-service', // 服务路径  8 };  9  10 let zkClient = null; 11  12 // 获取服务ip+port 13 export const getZKServiceBaseUrl = (servicePath) => { 14     return new Promise((resolve, reject) => { 15         try { 16             // 防止重复连接 17             if (zkClient) { 18                 disconnectZKService(); 19             } 20  21             // 新建连接 22             zkClient = zookeeper.createClient(ZK.clientAddress); 23             // 连接后执行一次 24             zkClient.once('connected', async function () { 25                 // 获取服务节点信息 26                 const res = await listChildren(zkClient, servicePath); 27                 res.message ? reject(res) : resolve(res); 28             }); 29  30             zkClient.connect(); 31         } catch (error) { 32             reject(error); 33         } 34     }); 35 }; 36  37 // 断开链接 38 export const disconnectZKService = () => { 39     if (zkClient) { 40         zkClient.close(); 41     } 42 }; 43  44 // 获取节点信息,ip+port 45 function listChildren(client, path) { 46     return new Promise((resolve, reject) => { 47         client.getChildren(path, 48             function () {}, 49             function (error, children) { 50                 if (error) { 51                     reject({ 52                         ...error, 53                         message: `获取ZK节点error,Path: ${path}` 54                     }); 55                 } 56                 try { 57                     let addressPath = path + '/'; 58                     if (children.length > 1) { 59                         //若存在多个地址,则随机获取一个地址 60                         addressPath += children[Math.floor(Math.random() * children.length)]; 61                     } else { 62                         //若只有唯一地址,则获取该地址 63                         addressPath += children[0]; 64                     } 65                     //获取服务地址 66                     client.getData(addressPath, function (err, data) { 67                         if (err) { 68                             reject({ 69                                 ...error, 70                                 message: `获取ZK服务地址error,Stack: ${err.stack}` 71                             }); 72                         } 73                         if (!data) { 74                             reject({ 75                                 ...error, 76                                 message: `ZK data is not exist` 77                             }); 78                         } 79                         const serviceInfo = JSON.parse(data); 80  81                         const url = serviceInfo.address + ':' + serviceInfo.port; 82                         resolve(url); 83                     }); 84                 } catch (error) { 85                     reject({ 86                         ...error, 87                         message: `list ZK children error` 88                     }); 89                 } 90             } 91         ); 92     }); 93 }

通过测试代码,可以实现调用Java服务。可能一般的程序员实现功能了就好了,可是作为一个有点追求的,感觉代码哪里有问题。具体是哪里呢,盯着屏幕瞅了两分钟,发现每次获取服务都取 ZK 注册中心获取,这个过程涉及到的网络请求而且还不是一次HTTP,如果只是这么简单的改造,程序单纯在性能响应上很有可能还不如老版本。我们可以在获取服务的真实远程地址前,添加一个本地缓存。通过ZK订阅机制,更新本地缓存数据。

思路虽然明确了,可以api扫了扫,没有我们想要的监听器,如下所示

Nodejs 使用 ZooKeeper 做服务发现

 

这怎么办,按理说的应该会有一个,节点数据改变推送的监听器,例如新增,删除,修改等等。找了半天也没找到合适的。

没办法,接着看源码吧,看了一会,忽然,看到一个似乎可用的,类

Nodejs 使用 ZooKeeper 做服务发现

 

 这不就是我需要的类吗,但是居然在一方法中注入监听器,先试试吧。

Nodejs 使用 ZooKeeper 做服务发现

 

 试了一下,嘿,真的可以了,当服务端节点数据发生变动后,会自动触发监听器 watcher 的回调逻辑。这就好办了,改造开始。

改进后的代码

const zookeeper = require('node-zookeeper-client'); var ZK = require('../config/env.js').zk;  const client = Object.freeze({     zkClient: zookeeper.createClient(ZK.connectionString),     serviceSet: [], //     serviceCache: Object.freeze({         map: new Map(),          /**          * 更新缓存          * @param {String} path 服务路径          * @param {Array<String>} arr 真实访问集合          */         updateCache: function (path, arr) {             this.map.set(path, arr);         },          /**          * 从缓存中获取访问地址          *           * @param {String} path 服务路径          * @returns String 真实访问地址          */         getRealPath: function (path) {             let arr = this.map.get(path);              if (arr.length > 1) //若存在多个地址,则随机获取一个地址                 return arr[Math.floor(Math.random() * arr.length)];             else //若只有唯一地址,则获取该地址                 return arr[0];         }     }),      connect: function () {         console.info("连接 zookeeper");          this.zkClient.once('connected', function () {             console.info("连接成功");         });          this.zkClient.connect();     },      getRealPath: function (serviceName) {         return new Promise(async (resolve, reject) => {             if (this.serviceSet.includes(serviceName)) {                 resolve(this.serviceCache.getRealPath(serviceName));             } else {                 // 加载服务节点信息                 this.loadChildren(serviceName).then(url => resolve(url)).catch(error => reject(error));             }         });     },      loadChildren: function (path) {         console.info("进入 loadChildren ");         return new Promise((resolve, reject) => {             this.zkClient.getChildren(path, (event) => {                 console.info(" loadChildren watcher ", path, event);                  this.getChildren(event.path);             }, (error, ids) => {                 console.info(" loadChildren callback ", path, error, ids);                 if (error) {                     reject({                         ...error,                         message: `获取ZK节点error,Path: ${path}`                     });                 } else {                     resolve(this.getData(path, ids));                 }             });         });     },      getChildren: function (path) {         console.info("进入 getChildren ");         return new Promise((resolve, reject) => {             this.zkClient.getChildren(path, (error, ids) => {                 console.info(" getChildren callback ", path, error, ids);                 if (error) {                     reject({                         ...error,                         message: `获取ZK节点error,Path: ${path}`                     });                 }                  resolve(this.getData(path, ids));             });         });     },      getData: function (path, ids) {         console.info("进入 getData ");          let pros = ids.map(id => new Promise((resolve, reject) => {             //获取服务地址             this.zkClient.getData(path + "/" + id, (error, data) => {                 console.info(" getData callback ", path, id);                 if (error) {                     reject({                         ...error,                         message: `获取ZK服务地址error,Stack: ${err.stack}`                     });                 }                 if (!data) {                     reject({                         ...error,                         message: `ZK data is not exist`                     });                 }                  const node = JSON.parse(data).payload;                 const protocol = node.ssl ? "https://" : "http://";                  resolve(`${protocol}${node.host}:${node.port}`);             });         }));          return Promise.all(pros).then(arr => this.serviceCache.updateCache(path, arr)).then(() => this.serviceCache.getRealPath(path));     },      disconnect: function () { //断开连接         console.info("进入 disconnect ")         if (this.zkClient) {             console.info("执行 close")             this.zkClient.close();         }     }, });  client.connect();  module.exports = {     getServiceUrl: (path) => client.getRealPath(path),     disconnect: () => client.disconnect(), }

这样终于,好一点了。

未完待续(多节点的选择问题)

多节点选择策略:随机,轮转,粘性 等等,一般不同的项目使用的策略也不太一样,实例中使用的是简单随机策略,后续再进行节点选择的策略问题优化啦。

关机,收工!!!

 

发表评论

相关文章