大纲
1.zk实现数据发布订阅
2.zk实现负载均衡
3.zk实现分布式命名服务
4.zk实现分布式协调(Master-Worker协同)
5.zk实现分布式通信
6.zk实现Master选举
7.zk实现分布式锁
8.zk实现分布式队列和分布式屏障
1.zk实现数据发布订阅
(1)发布订阅系统一般有推模式和拉模式
(2)zk采用了推拉相结合来实现发布订阅
(3)使用zk来实现发布订阅总结
(4)zk原生实现分布式配置的示例(也就是实现注册发现或者数据发布订阅)
(1)发布订阅系统一般有推模式和拉模式
推模式:服务端主动将更新的数据发送给所有订阅的客户端。
拉模式:客户端主动发起请求来获取最新数据(定时轮询拉取)。
(2)zk采用了推拉相结合来实现发布订阅
首先客户端需要向服务端注册自己关注的节点(添加Watcher事件)。一旦该节点发生变更,服务端就会向客户端发送Watcher事件通知。客户端接收到消息通知后,需要主动到服务端获取最新的数据。所以,zk的Watcher机制有一个缺点就是:客户端不能定制服务端回调,需要客户端收到Watcher通知后再次向服务端发起请求获取数据,多进行一次网络交互。
如果将配置信息放到zk上进行集中管理,那么:
一.应用启动时需主动到zk服务端获取配置信息,然后在指定节点上注册一个Watcher监听。
二.只要配置信息发生变更,zk服务端就会实时通知所有订阅的应用,让应用能实时获取到订阅的配置信息节点已发生变更的消息。
注意:原生zk客户端可以通过getData()、exists()、getChildren()三个方法,向zk服务端注册Watcher监听,而且注册的Watcher监听具有一次性,所以zk客户端获得服务端的节点变更通知后需要再次注册Watcher。
(3)使用zk来实现数据发布订阅总结
步骤一:将配置信息存储到zk的节点上。
步骤二:应用启动时首先从zk节点上获取配置信息,然后再向该zk节点注册一个数据变更的Watcher监听。一旦该zk节点数据发生变更,所有订阅的客户端就能收到数据变更通知。
步骤三:应用收到zk服务端发过来的数据变更通知后重新获取最新数据。
(4)zk原生实现分布式配置(也就是实现注册发现或者数据发布订阅)
配置可以使用数据库、Redis、或者任何一种可以共享的存储位置。使用zk的目的,主要就是利用它的回调机制。任何zk的使用方不需要去轮询zk,Redis或者数据库可能就需要主动轮询去看看数据是否发生改变。使用zk最大的优势是只要对数据添加Watcher,数据发生修改时zk就会回调指定的方法。注意:new一个zk实例和向zk获取数据都是异步的。
如下的做法其实是一种Reactor响应式编程:使用CoundownLatch阻塞及通过调用一次数据来触发回调更新本地的conf。我们并没有每个场景都线性写一个方法堆砌起来,而是用相应的回调和Watcher事件来粘连起来。其实就是把所有事件发生前后要做的事情粘连起来,等着回调来触发。
一.先定义一个工具类可以获取zk实例
public class ZKUtils { private static ZooKeeper zk; private static String address = "192.168.150.11:2181,192.168.150.12:2181,192.168.150.13:2181,192.168.150.14:2181/test"; private static DefaultWatcher defaultWatcher = new DefaultWatcher(); private static CountDownLatch countDownLatch = new CountDownLatch(1); public static ZooKeeper getZK() { try { zk = new ZooKeeper(address, 1000, defaultWatcher); defaultWatcher.setCountDownLatch(countDownLatch); //阻塞直到建立好连接拿到可用的zk countDownLatch.await(); } catch (Exception e) { e.printStackTrace(); } return zk; } }
二.定义和zk建立连接时的Watcher
public class DefaultWatcher implements Watcher { CountDownLatch countDownLatch ; public void setCountDownLatch(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } @Override public void process(WatchedEvent event) { System.out.println(event.toString()); switch (event.getState()) { case Unknown: break; case Disconnected: break; case NoSyncConnected: break; case SyncConnected: countDownLatch.countDown(); break; case AuthFailed: break; case ConnectedReadOnly: break; case SaslAuthenticated: break; case Expired: break; } } }
三.定义分布式配置的核心类WatcherCallBack
这个WatcherCallBack类不仅实现了Watcher,还实现了两个异步回调。
首先通过zk.exists()方法判断配置的znode是否存在并添加监听(自己) + 回调(自己),然后通过countDownLatch.await()方法进行阻塞。
在回调中如果发现存在配置的znode,则设置配置并执行countDown()方法不再进行阻塞。
在监听中如果发现数据变化,则会调用zk.getData()方法获取配置的数据,并且获取配置的数据时也会继续监听(自己) + 回调(自己)。
public class WatcherCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback { ZooKeeper zk; MyConf conf; CountDownLatch countDownLatch = new CountDownLatch(1); public MyConf getConf() { return conf; } public void setConf(MyConf conf) { this.conf = conf; } public ZooKeeper getZk() { return zk; } public void setZk(ZooKeeper zk) { this.zk = zk; } //判断配置是否存在并监听配置的znode public void aWait(){ zk.exists("/AppConf", this, this ,"ABC"); try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } //回调自己,这是执行完zk.exists()方法或者zk.getData()方法的回调 //在回调中如果发现存在配置的znode,则设置配置并执行countDown()方法不再进行阻塞。 @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { if (data != null) { String s = new String(data); conf.setConf(s); countDownLatch.countDown(); } } //监听自己,这是执行zk.exists()方法或者zk.getData()方法时添加的Watcher监听 //在监听中如果发现数据变化,则会继续调用zk.getData()方法获取配置的数据,并且获取配置的数据时也会继续监听(自己) + 回调(自己) @Override public void processResult(int rc, String path, Object ctx, Stat stat) { if (stat != null) {//stat不为空, 代表节点已经存在 zk.getData("/AppConf", this, this, "sdfs"); } } @Override public void process(WatchedEvent event) { switch (event.getType()) { case None: break; case NodeCreated: //调用一次数据, 这会触发回调更新本地的conf zk.getData("/AppConf", this, this, "sdfs"); break; case NodeDeleted: //容忍性, 节点被删除, 把本地conf清空, 并且恢复阻塞 conf.setConf(""); countDownLatch = new CountDownLatch(1); break; case NodeDataChanged: //数据发生变更, 需要重新获取调用一次数据, 这会触发回调更新本地的conf zk.getData("/AppConf", this, this, "sdfs"); break; case NodeChildrenChanged: break; } } }
分布式配置的核心配置类:
//MyConf是配置中心的配置 public class MyConf { private String conf ; public String getConf() { return conf; } public void setConf(String conf) { this.conf = conf; } }
四.通过WatcherCallBack的方法判断配置是否存在并尝试获取数据
public class TestConfig { ZooKeeper zk; public void conn () { zk = ZKUtils.getZK(); } public void close () { try { zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } } public void getConf() { WatchCallBack watchCallBack = new WatchCallBack(); //传入zk和配置conf watchCallBack.setZk(zk); MyConf myConf = new MyConf(); watchCallBack.setConf(myConf); //节点不存在和节点存在, 都尝试去取数据, 取到了才往下走 watchCallBack.aWait(); while(true) { if (myConf.getConf().equals("")) { System.out.println("conf diu le ......"); watchCallBack.aWait(); } else { System.out.println(myConf.getConf()); } try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } } }
2.zk实现负载均衡
(1)负载均衡算法
(2)使用zk来实现负载均衡
(1)负载均衡算法
常用的负载均衡算法有:轮询法、随机法、原地址哈希法、加权轮询法、加权随机法、最小连接数法。
一.轮询法
轮询法是最为简单的负载均衡算法。当接收到客户端请求后,负载均衡服务器会按顺序逐个分配给后端服务。比如集群中有3台服务器,分别是server1、server2、server3,轮询法会按照sever1、server2、server3顺序依次分发请求给每个服务器。当第一次轮询结束后,会重新开始下一轮的循环。
二.随机法
随机法是指负载均衡服务器在接收到来自客户端请求后,根据随机算法选中后台集群中的一台服务器来处理这次请求。由于当集群中的机器变得越来越多时,每台机器被抽中的概率基本相等,因此随机法的实际效果越来越趋近轮询法。
三.原地址哈希法
原地址哈希法是根据客户端的IP地址进行哈希计算,对计算结果进行取模,然后根据最终结果选择服务器地址列表中的一台机器来处理请求。这种算法每次都会分配同一台服务器来处理同一IP的客户端请求。
四.加权轮询法
由于一个分布式系统中的机器可能部署在不同的网络环境中,每台机器的配置性能各不相同,因此其处理和响应请求的能力也各不相同。
如果采用上面几种负载均衡算法,都不太合适。这会造成能力强的服务器在处理完业务后过早进入空闲状态,而性能差或网络环境不好的服务器一直忙于处理请求造成任务积压。
为了解决这个问题,可以采用加权轮询法。加权轮询法的方式与轮询法的方式很相似,唯一的不同在于选择机器时,不只是单纯按照顺序的方式去选择,还要根据机器的配置和性能高低有所侧重,让配置性能好的机器优先分配。
五.加权随机法
加权随机法和上面提到的随机法一样,在采用随机法选举服务器时,会考虑系统性能作为权值条件。
六.最小连接数法
最小连接数法是指:根据后台处理客户端的请求数,计算应该把新请求分配给哪一台服务器。一般认为请求数最少的机器,会作为最优先分配的对象。
(2)使用zk来实现负载均衡
一.状态收集之实现zk的业务服务器列表
二.请求分配之如何选择业务服务器
实现负载均衡服务器的关键是:探测和发现业务服务器的运行状态 + 分配请求给最合适的业务服务器。
一.状态收集之实现zk的业务服务器列表
首先利用zk的临时子节点来标记业务服务器的状态。在业务服务器上线时:通过向zk服务器创建临时子节点来实现服务注册,表示业务服务器已上线。在业务服务器下线时:通过删除临时节点或者与zk服务器断开连接来进行服务剔除。最后通过统计临时节点的数量,来了解业务服务器的运行情况。
在代码层面的实现中,首先定义一个BlanceSever接口类。该类用于业务服务器启动或关闭后:第一.向zk服务器地址列表注册或注销服务,第二.根据接收到的请求动态更新负载均衡情况。
class BlanceSever { //向zk服务器地址列表进行注册服务 void register() //向zk服务器地址列表进行注销服务 void unregister() //根据接收到的请求动态更新负载均衡情况 void addBlanceCount() void takeBlanceCount() }
之后创建BlanceSever接口的实现类BlanceSeverImpl,在BlanceSeverImpl类中首先定义:业务服务器运行的Session超时时间、会话连接超时时间、zk客户端地址、服务器地址列表节点SERVER_PATH等基本参数。并通过构造函数,在类被引用时进行初始化zk客户端对象实例。
public class BlanceSeverImpl implements BlanceSever { private static final Integer SESSION_TIME_OUT; private static final Integer CONNECTION_TIME_OUT; private final ZkClient zkclient; private static final SERVER_PATH = "/Severs"; public BlanceSeverImpl() { init... } }
接下来定义,业务服务器启动时,向zk注册服务的register方法。
在如下代码中,会通过在SERVER_PATH路径下创建临时子节点的方式来注册服务。首先获取业务服务器的IP地址,然后利用IP地址作为临时节点的path来创建临时节点。
public register() throws Exception { //首先获取业务服务器的IP地址 InetAddress address = InetAddress.getLocalHost(); String serverIp = address.getHostAddress(); //然后利用IP地址作为临时节点的path来创建临时节点 zkclient.createEphemeral(SERVER_PATH + serverIp); }
接下来定义,业务服务器关机或不对外提供服务时的unregister()方法。通过调用unregister()方法,注销该台业务服务器在zk服务器列表中的信息。注销后的机器不会被负载均衡服务器分发处理会话。在如下代码中,会通过删除SERVER_PATH路径下临时节点的方式来注销业务服务器。
public unregister() throws Exception { zkclient.delete(SERVER_PATH + serverIp); }
二.请求分配之如何选择业务服务器
以最小连接数法为例,来确定如何均衡地分配请求给业务服务器,整个实现步骤如下:
步骤一:首先负载均衡服务器在接收到客户端的请求后,通过getData()方法获取已成功注册的业务服务器列表,也就是"/Servers"节点下的各个临时节点,这些临时节点都存储了当前服务器的连接数。
步骤二:然后选取连接数最少的业务服务器作为处理当前请求的业务服务器,并通过setData()方法将该业务服务器对应的节点值(连接数)加1。
步骤三:当该业务服务器处理完请求后,调用setData()方法将该节点值(连接数)减1。
下面定义,当业务服务器接收到请求后,增加连接数的addBlance()方法。在如下代码中,首先通过readData()方法获取服务器最新的连接数,然后将该连接数加1。接着通过writeData()方法将最新的连接数写入到该业务服务器对应的临时节点。
public void addBlance() throws Exception { InetAddress address = InetAddress.getLocalHost(); String serverIp = address.getHostAddress(); Integer con_count = zkClient.readData(SERVER_PATH + serverIp); ++con_count; zkClient.writeData(SERVER_PATH + serverIp, con_count); }
3.zk实现分布式命名服务
(1)ID编码的特性
(2)通过UUID方式生成分布式ID
(3)通过TDDL生成分布式ID
(4)通过zk生成分布式ID
(5)SnowFlake算法
命名服务是分布式系统最基本的公共服务之一。在分布式系统中,被命名的实体可以是集群中的机器、提供的服务地址等。例如,Java中的JNDI便是一种典型的命名服务。
(1)ID编码的特性
分布式ID生成器就是通过分布式的方式,自动生成ID编码的程序或服务。生成的ID编码一般具有唯一性、递增性、安全性、扩展性这几个特性。
(2)通过UUID方式生成分布式ID
UUID能非常简便地保证分布式环境中ID的唯一性。它由32位字符和4个短线字符组成,比如e70f1357-f260-46ff-a32d-53a086c57ade。
由于UUID在本地应用中生成,所以生成速度比较快,不依赖其他服务和网络。但缺点是:长度过长、含义不明、不满足递增性。
(3)通过TDDL生成分布式ID
MySQL的自增主键是一种有序的ID生成方式,还有一种性能更好的数据库序列生成方式:TDDL中的ID生成方式。TDDL是Taobao Distributed Data Layer的缩写,是一种数据库中间件,主要应用于数据库分库分表的应用场景中。
TDDL生成ID编码的大致过程如下:首先数据库中有一张Sequence序列化表,记录当前已被占用的ID最大值。然后每个需要ID编码的客户端在请求TDDL的ID编码生成器后,TDDL都会返回给该客户端一段ID编码,并更新Sequence表中的信息。
客户端接收到一段ID编码后,会将该段编码存储在内存中。在本机需要使用ID编码时,会首先使用内存中的ID编码。如果内存中的ID编码已经完全被占用,则再重新向编码服务器获取。
TDDL通过分批获取ID编码的方式,减少了客户端访问服务器的频率,避免了网络波动所造成的影响,并减轻了服务器的内存压力。不过TDDL高度依赖数据库,不能作为独立的分布式ID生成器对外提供服务。
(4)通过zk生成分布式ID
每个需要ID编码的业务服务器可以看作是zk的客户端,ID编码生成器可以看作是zk的服务端,可以利用zk数据模型中的顺序节点作为ID编码。
客户端通过create()方法来创建一个顺序子节点。服务端成功创建节点后会响应客户端请求,把创建好的节点发送给客户端。客户端以顺序节点名称为基础进行ID编码,生成ID后就可以进行业务操作。
(5)SnowFlake算法
SnowFlake算法是Twitter开源的一种用来生成分布式ID的算法,通过SnowFlake算法生成的编码会是一个64位的二进制数。
第一个bit不用,接下来的41个bit用来存储毫秒时间戳,再接下来的10个bit用来存储机器ID,剩余的12个bit用来存储流水号和0。
