循序渐进 Redis 分布式锁(以及何时不用它)

场景

假设我们有个批处理服务,实现逻辑大致是这样的:

  1. 用户在管理后台向批处理服务投递任务;
  2. 批处理服务将该任务写入数据库,立即返回;
  3. 批处理服务有启动单独线程定时从数据库获取一批未处理(或处理失败)的任务,投递到消息队列中;
  4. 批处理服务启动多个消费线程监听队列,从队列中拿到任务并处理;
  5. 消费线程处理完成(成功或者失败)后修改数据库中相应任务的状态;

流程如图:

循序渐进 Redis 分布式锁(以及何时不用它)

现在我们单独看看上图中虚线框中的内容(3~6):批处理服务从数据库拉取任务列表投递到消息队列。

生产环境中,为了高可用,都会部署至少两台批处理服务器,也就是说至少有两个进程在执行虚线框中的流程。

有什么问题呢?

假设这两个进程同时去查任务表(这是很有可能的),它俩很可能会得到同一批任务列表,于是这批任务都会入列两次。

当然,这不是说一个任务入列两次就一定会导致任务被重复执行——我们可以通过多引入一个状态值来解决此问题。

消费者线程从队列中获取到任务后,再次用如下 SQL 更新任务状态:

-- status:1-待处理;2-已入列;3-处理中;4-失败待重试;5-彻底失败(不可重试);

update tasks set status=3 where status=2 and id=$id;

由于 where 条件有 status=2,即只有原先状态是“已入列”的才能变成“处理中”,如果多个线程同时拿到同一个任务,一定只有一个线程能执行成功上面的语句,进而继续后续流程(其实这就是通过数据库实现的简单的分布式锁——乐观锁)。

不过,当定时进程多了后,大量的重复数据仍然会带来性能等其他问题,所以有必要解决重复入列的问题。

有个细节:请注意上图中步骤 5、6,是先改数据库状态为“已入列”,再将消息投递到消息队列中——这和常规逻辑是反过来的。

能否颠倒 5 和 6 的顺序,先入列,再改数据库状态呢?

不能。从逻辑上来说确实应该如此,但它会带来问题。消费线程从队列中拿到任务后,会执行如下 SQL 语句:

update tasks set status=3 where status=2 and id=$id;

这条 SQL 依赖于前面(第 5 步)产生的状态值,所以它要求在执行该语句的时候,第 5 步的 SQL 语句(将状态改为“已入列”)一定已经执行完了。如果将 5 和 6 颠倒(先入列,再改状态值),就有可能出现下图的执行顺序,导致消费者线程修改状态失败,进而执行不下去:

循序渐进 Redis 分布式锁(以及何时不用它)

上图中,任务入列后立即被消费线程获取到并去修改数据库,而此时定时线程的 SQL 可能还没执行(可能网络延迟),这就出问题了。

定时线程先将状态改为“已入列”带来的问题是,如果改状态后(入列前)进程挂了,会导致任务一直处于已入列状态(但实际上未入列),所以还需要搭配其它的超时重试机制。

上图虚线框中那段逻辑在并发原语中有个专门名称叫“临界区”——我们要做的就是让多个操作者(进程、线程、协程)必须一个一个地(而不能一窝蜂地)去执行临界区内部的逻辑,手段就是加锁:

var lock = newLock()  // 加锁 lock.lock()  // 执行临界区的逻辑  // 释放锁 lock.unlock() 

所谓锁,就是多个参与者(进程、线程)争抢同一个共享资源(术语叫“信号量”),谁抢到了就有资格往下走,没抢到的只能乖乖地等(或者放弃)。锁的本质是两点:

  1. 它是一种共享资源,对于多方参与者来说,只有一个,就好比篮球场上只有一个篮球,所有人都抢这一个球;
  2. 对该资源的操作(加锁、解锁)是原子性的。虽然大家一窝蜂都去抢一个球,但最终这个球只会属于某一个人,不可能一半在张三手上,另一半在李四手上。只有抢到球的一方才可以执行后续流程(投篮),另一方只能继续抢;

在单个进程中,以上两点很容易实现:同一个进程中的线程之间天然是共享进程内存空间的;原子性也直接由 CPU 指令保证。所以单个进程中,我们直接用编程语言提供的锁即可。

进程之间呢?

进程之间的内存空间是独立的。两个进程(可能在两台不同的物理机上)创建的锁资源自然也是独立的——这就好比两个篮球场上的两个篮球之间毫不相干。

那怎样让两个篮球场上的两队人比赛呢?只能让他们去同一个地方抢同一个球——这在编程中叫“分布式锁”。

有很多实现分布式锁的方案(关系数据库、zookeeper、etcd、Redis 等),本篇单讲用 Redis 来实现分布式锁。

小试牛刀

之所以能用 Redis 实现分布式锁,依赖于其三个特性:

  1. Redis 作为独立的存储,其数据天然可以被多进程共享;
  2. Redis 的指令是单线程执行的,所以不会出现多个指令并发地读写同一块数据;
  3. Redis 指令是纯内存操作,速度是微妙级的(不考虑网络时延),性能足够高;

有些人一想到“单线程-高性能”就条件反射地回答 IO 多路复用,其实 Redis 高性能最主要就是纯内存操作。

Redis 分布式锁的大体调用框架是这样的:

循序渐进 Redis 分布式锁(以及何时不用它)

多个进程的多个线程争抢同一把 Redis 锁。

说到 Redis 分布式锁,大部分人都会想到 setnx 指令:

// setnx 使用方式 SETNX key value 

意思是:如果 key 不存在(Not eXists),则将 key 设置为 value 并返回 1,否则啥也不做并返回 0——也就是说, key 只会被设置一次,利用这个特性就可以实现锁(如果返回 1 表示加锁成功,0 则说明别人已经加锁了,本次加锁失败)。

我们写下伪代码:

// 获取 redis client 单例 var redis = NewRedisClient(redisConf);  // 通过 SETNX 指令加锁 func lock(string lockKey) bool {     result = redis.setnx(lockKey, 1);     return bool(result); }  // 通过 DEL 指令解锁 func unlock(string lockKey) {     redis.del(lockKey); } 

上面的定时任务进程中这样使用:

var lockKey = "batch:task:list"  // 上锁 if (!lock(lockKey)) {     // 获取锁失败,直接返回     return false; }  try {     // 查询数据库获取待处理任务列表     // 更新任务状态     // 入列 } finally {     // 解锁     unlock(lockKey);	 } 

很简单!半小时搞定,上线!

第一次懵逼

上线没跑几天就出问题了:任务无缘无故地不执行了,消息队列中很长时间没接收到消息了。

分析了半天,我们发现 Redis 中一直存在 batch:task:list 这条记录,没人去删除它!

盯着代码我们突然发现问题所在:这个 key 压根没有过期时间!也就是说,如果程序不 DEL 它就永远存在。

估计某进程在执行 unlock 之前崩溃了(或者哪个愣头青执行了 kill -9 ?),或者 unlock 时发生了网络问题,或者 Redis 宕机了?总之 DEL 没执行,于是这个锁永远得不到释放!

好办,加上过期时间呗:

...  // 通过 SETNX 指令加锁 // 加上过期时间,单位毫秒 func lock(string lockKey, int ttl = 3000) bool {     // 加锁     result = redis.setnx(lockKey, 1);     // 设置过期时间(毫秒)     redis.pexpire(lockKey, ttl); 	     return bool(result); }  ... 

这段代码有什么问题呢?

这里通过两次网络请求执行了两条 Redis 指令:setnx 设置 KV,expire 设置超时时间——我们前面说锁操作必须具备原子性,但这两条操作谁也不能保证要么都成功要么都失败啊。假如第一条指令(setnx)执行成功了,但 expire 由于网络原因或者进程崩溃导致执行失败了呢?此时同样会出现上面那个懵逼的问题啊。

我们可以用 Lua 脚本实现 setnx 和 expire 操作的原子性,不过 Redis 2.6.12 版本后可以用 SET 指令搞定:

// 2.6.12 后的 SET 指令格式 // 现在的 SET 指令相当强大也相当复杂,可以替代 SETNX, SETEX, PSETEX, GETSET, 此处只写出跟分布式锁有关的 // 其中两个可选参数: // -- NX 表示 Not eXists,就是 SETNX 的意思; // -- PX 是 PEXPIRE 的意思,表示设置 key 的过期时间(毫秒);  SET key value [NX] [PX milliseconds] 

改下 Lock 代码:

// 加锁 func lock(string lockKey, int ttl = 3000) bool {     // Set 函数参数对应上面的命令格式     result = redis.set(lockKey, 1, "NX", "PX", ttl); 	     return bool(result); } 

如此,加了过期时间防止锁无法释放,还保证了加锁操作的原子性,妥了,上线!

第二次懵逼

第二次上线没多久又出现了灵异事件:偶尔会出现一批任务重复入列——敢情这锁加了个寂寞?

各种打日志,终于发现了端倪:有个进程加锁 3.5 秒后才解锁,而且解锁成功了——但我们设置的锁超时时间是 3 秒啊!

也就是说,这个线程解的是别的线程的锁!

// 通过 DEL 指令解锁 // 这里直接调 Redis 的 DEL 指令删除 lockKey,并没有判断该 lockKey 的值是不是本进程设置的 // 所以在有 TTL 的情况下,删的可能是别的线程加的锁 func unlock(string lockKey) {     redis.del(lockKey); } 

和进程内的本地锁不同的是,Redis 分布式锁加入超时机制后,锁的释放就存在两种情况:

  1. 加锁者主动释放;
  2. 超时被动释放;

所以解锁(DEL)之前需要判断锁是不是自己加的,方法是在加锁的时候生成一个唯一标识。之前我们 SET key value 时 value 给的是固定值 1,现在我们换成一个随机值:

// Redis 分布式锁 // 封装成类 // 该类实例不具备线程安全性,不应跨线程使用 class Lock {     private redis;     private name;     private token;     private ttl;     private status; 	     const ST_UNLOCK = 1;     const ST_LOCKED = 2;     const ST_RELEASED = 3; 	     public function Lock(Redis redis, string name, int ttl = 3000) {         this.redis = redis; 	this.name = name; 	this.token = randStr(16);// 生成 16 字节随机字符串 	this.ttl = ttl; 	this.status = self::ST_UNLOCK;     } 	     // 加锁     public function lock() bool { 	if (this.status != self::ST_UNLOCK) { 	    return false; 	} 		 	// 使用 SET 命令加锁 	// value 不再传 1,而是设置成构造函数中生成的随机串 	try { 	    result = redis.set(this.name, this.token, "NX", "PX", this.ttl); 	    if (bool(result)) { 	        this.status = self::ST_LOCKED; 	        return true; 	    } 	} catch (Exception e) { 	    return false; 	} 	 	return false;     } 	     // 解锁     public function unlock() { 	if (this.status != self::ST_LOCKED) { 	    return; 	} 		 	// 执行 DEL 之前需要用 GET 命令判断 KEY 的值是不是当前的 token 	// 由于需要执行 GET 和 DEL 两条指令,而锁操作必须保证原子性,需要用 Lua 脚本 	// 脚本中通过 redis.call() 执行 Redis 命令 	// 注意 Lua 脚本数组下标从 1 开始 	// 这段脚本的意思是: 	// 如果 key 的值是 token,则 DEL key,否则啥也不做 	var lua = " 		if (redis.call('get', KEYS[1]) == ARGV[1]) then                     redis.call('del', KEYS[1]);                  end                  return 1; 	"; 		 	// 调 Redis 的 EVEL 指令执行 Lua 脚本 	// EVAL 指令格式: 	// EVEL script numkeys key1,key2,arg1,arg2...  	// -- script: Lua 脚本 	// -- numkeys: 说明后面的参数中,有几个是 key,这些 key 后面的都是参数 	// 比如:EVAL "redis.call('set', KEYS[1], ARGV[1])" 1 mykey hello 	// 等价于命令 SET mykey hello 	// 参见:https://redis.io/commands/eval/ 	redis.eval(lua, 1, this.name, this.token); 	this.status = self::ST_RELEASED;     } } 

业务调用:

lock = new Lock(redis, "batch:task:list");  try {     if (!lock.lock()) { 	return false;     } 	     // 加锁成功,执行业务 } finally {     lock.unlock(); } 

上面这段代码实现了:

  1. 加锁的时候设置了过期时间,防止进程崩溃而导致锁无法释放;
  2. 解锁的时候判断了当前的锁是不是自己加的,防止释放别人的锁;
  3. 加锁和解锁操作都具备原子性;

这段代码已经是生产可用了,第三次上线。

不过,还是有些优化需要做的。

优化一:锁等待

上面的 lock() 方法中,如果获取锁失败则直接返回 false,结束执行流,这可能不能满足某些业务场景。

在本地锁场景中,如果获取锁失败,线程会进入阻塞等待状态——我们希望分布式锁也能提供该功能。

我们在加锁失败时增加重试功能:

class Lock {     // 重试间隔:1 秒     const RETRY_INTERVAL = 1000;     // ...     // 重试次数(包括首次)     private retryNum; 	     // retryNum: 默认只执行一次(不重试)     public function Lock(Redis redis, string name, int ttl = 3000, int retryNum = 1) {         ... 	// 做下防御 	if (retryNum < 0 || retryNum > 20) { 	    retryNum = 1; 	} 	this.retryNum = retryNum;     } 	     // 加锁     public function lock() bool {         if (this.status != self::ST_UNLOCK) { 	    return false; 	} 		         // 使用 SET 命令加锁 	// 加入重试机制 	for (i = 0; i < this.retryNum; i++) { 	    try {                 result = redis.set(this.name, this.token, "NX", "PX", this.ttl); 		if (bool(result)) { 		    // 加锁成功,返回	 		    this.status = self::ST_LOCKED; 		    return true; 		} 	    } catch (Exception e) {             } 			 	    // 加锁失败了,等待一定的时间后重试 	    // 当前线程/协程进入休眠 	    sleep(self::RETRY_INTERVAL); 	}  	return false;     } } 

优化二:锁超时

我们再回头看看上面的加锁逻辑,其核心代码如下:

public function lock() bool {    // ...     result = redis.set(this.name, this.token, "NX", "PX", this.ttl);     if (bool(result)) {         // 加锁成功,返回	         this.status = self::ST_LOCKED; 	return true;     }     //... } 

这段代码有没有什么问题呢?

想象如下的加锁场景:

// 锁超时时间是 2 秒 var lock = new Lock(redis, name, 2000);  if (lock.lock()) {     // 加锁成功,加锁用时 2.5 秒     try { 	// 执行业务逻辑     } finally { 	// 解锁 	lock.unlock();     } } 

如上,我们创建一个有效期 2 秒的锁,然后调 Redis 命令加锁,该过程花了 2.5 秒(可能网络抖动)。

对于本线程来说,得到加锁成功的返回值,继续往下执行。

但此时该 lockKey 在 Redis 那边可能已经过期了,如果此时另一个线程去拿锁,也会成功拿到锁——如此锁的作用便失效了。

循序渐进 Redis 分布式锁(以及何时不用它)

所以,在 lock() 方法中,调 Redis 上锁成功后,需要判断上锁用时,如果时间超过了锁的有效期,则应视为上锁无效,如果有重试机制,则重试:

class Lock {     // 加锁     public function lock() bool {         if (this.status != self::ST_UNLOCK) { 	    return false; 	} 		 	for (i = 0; i < this.retryNum; i++) { 	    try { 	        // 上锁之前,保存当前毫秒数 		var startTime = getMillisecond(); 		// 上锁 		result = redis.set(this.name, this.token, "NX", "PX", this.ttl); 		// 上锁后,计算使用的时间 		var useTime = getMillisecond() - startTime; 				 		// 加锁成功条件:Redis 上锁成功,且所用的时间小于锁有效期 		if (bool(result) && useTime < this.ttl) { 	            // 加锁成功,返回	 		    this.status = self::ST_LOCKED; 		    return true; 		} 	    } catch (Exception e) {} 			 	    // 加锁失败了,等待一定的时间后重试 	    // 当前线程/协程进入休眠 	    sleep(self::RETRY_INTERVAL);         }  	return false;     } } 

如上,在判断条件中增加了加锁用时的判断。

这段代码还有问题吗?

有的。

我们用 Redis 的 SET NX 命令加锁,该命令如果发现 key 已经存在,则直接返回 0,加锁失败。

在上面的失败重试逻辑中,如果是因为加锁用时超限导致的失败(锁有效期是 2 秒,结果加锁操作用了 2.5 秒),此时我们并不能切确知道在 Redis 那边该 key 是否真的已经失效了,如果没有失效(比如来去网络用时各 1.24 秒,此时该 key 并没有失效),那么下一次的重试会因 SET NX 的机制而失败。

所以我们不能用 SET NX 加锁,只能用普通的 SET + Lua 脚本来实现:

class Lock {     // 加锁     public function lock() bool {         if (this.status != self::ST_UNLOCK) { 	    return false; 	} 		 	// 加锁的 Lua 脚本 	// 注意 Lua 中的注释不是用 // 或者 /**/,而是用 -- 	// 参数说明: 	// KEYS[1]: lockKey 	// ARGV[1]: token 	// ARGV[2]: ttl 毫秒 	var lua = " 		local val = redis.call('get', KEYS[1]); 		if (not val) then 		    -- 没有设置,则直接设置 		    return redis.call('set', KEYS[1], ARGV[1], 'PX', ARGV[2]); 		else 		    -- 存在,则比较 val 是否等于 token 		    if (val == ARGV[1] ) then 		        -- 该 key 就是当前线程设置的 			-- 延长其 TTL 			return redis.call('pexpire', KEYS[1], ARGV[2]); 		    else 		        -- 其他线程上的锁 			return 0; 		    end 		end 	"; 		 	for (i = 0; i < this.retryNum; i++) {             // 加锁逻辑同上 	}  	return false;     } } 

如此,便解决了加锁超时导致的竞态问题——但只解决了一半。

设想这样的场景:

进程 A 加了一个有效期 5 秒的锁,加锁成功后执行业务逻辑,业务逻辑执行耗时 10 秒——就是说,在业务逻辑执行到差不多一半的时候锁就失效了,此时别的进程就可以抢到锁了,这就会导致竞态问题。

有两种解决方案:

  1. 设置个较长的过期时间。这是最简单的(而且也很有效)。比如我们预估 99% 的处理时间不超过 2 秒,则将锁有效期设置为 10 秒。该方案最大的缺点是一旦进程崩溃导致无法主动释放锁,就会导致其他进程在很长一段时间内(如 10 秒)无法获得锁,这在某些场景下可能是非常严重的。
  2. 搞个定时任务线程,定时延长锁的有效期。

方案二伪代码如下:

// 带 Refresh 版本的分布式锁 class Lock {     private redis;     private name;     private ttl;     private token;     private retryNum;     private status;     // 定时器     private timer; 	     // 锁状态:1 未加锁;2 已加锁;3 已释放     const ST_UNLOCK = 1;     const ST_LOCKED = 2;     const ST_RELEASED = 3; 	     // 刷新状态:     //	4 刷新成功;     //	5 非法(key 不存在或者不是本线程加的锁)     //	6 刷新失败(Redis 不可用)     const RF_SUC = 4;     const RF_INVALID = 5;     const RF_FAIL = 6; 	     // 构造函数     public function Lock(Redis redis, string name, int ttl = 2000, int retryNum = 1) {         ...     } 	     // 加锁     // 加锁成功后启动定时器     public function lock() bool {         if (this.status != self::ST_UNLOCK) {             return false; 	} 		 	// 加锁的 Lua 脚本,同前面的 	lua = "..."; 		 	for (i = 0; i < this.retryNum; i++) { 	    var startTime = getMillisecond(); 	    try { 	        // 执行 Lua 脚本上锁 		result = this.redis.eval(lua, 1, this.name, this.token, this.ttl); 		var useTime = getMillisecond() - startTime; 				 		if (bool(result) && useTime < this.ttl) { 		    // 加锁成功 		    this.status = self::ST_LOCKED; 		    // 启动定时器 		    this.tick(); 					 		    return true; 		} 	    } catch (Exception e) { 		// Redis 不可用 	    } 			 	    // 失败重试 	    sleep(RETRY_INTERVAL);         }  	return false;     } 	     // 启动定时器,定时刷新过期时间     private function tick() {         this.timer = startTimerInterval( 	    this.ttl / 3, 	    function () { 	        result = this.refresh(); 		if (result == self::RF_INVALID) { 		    // key 不存在,或者该锁被其他线程占用 		    // 停掉定时器 		    this.timer.stop(); 		} 	    } 	);     } 	     // 释放锁     // 需要停掉定时器     public function unlock() {         if (this.status != self::ST_LOCKED) { 	    return; 	} 		         // 释放锁的 Lua 脚本,同前 	var lua = "..."; 		 	try { 	    this.redis.eval(lua, 1, this.name, this.token);         } catch (Exception e) {} finally { 	    this.status = self::ST_RELEASED; 	    // 停掉定时器 	    this.timer.stop(); 	}     } 	     // 刷新锁过期时间     private function refresh() int {         if (this.status != self::ST_LOCKED) { 	    return self::RF_INVALID; 	} 		 	var lua = " 	    -- key 存在而且其值等于 token 才刷新过期时间 	    if (redis.call("get", KEYS[1]) == ARGV[1]) then 	        return redis.call("pexpire", KEYS[1], ARGV[2]) 	    else 	        return 0 	    end 	"; 		         try { 	    result = this.redis.eval(lua, 1, this.name, this.token, this.ttl); 	    if (result == 0) { 	        // key 不存在或者是别人加的锁 		return self::RF_INVALID; 	    } else { 	        // 刷新成功 		return self::RF_SUC; 	    } 	} catch (Exception e) { 	    // Redis 不可用 	    return self::RF_FAIL; 	}     } } 

如上,加锁成功后创建一个单独的定时器(独立的线程/协程)刷新锁的 TTL,只要锁没被主动释放(而且进程没有崩溃),就会不停地续命,保证不会过期。此时,我们就能在加锁时选择一个比较小的过期时间(比如 2 秒),一旦进程崩溃,其他进程也能较快获得锁。

上面定时器时间为何选择 ttl/3 呢?

假设锁过期时间(ttl)为 6 秒,由上面 lock() 函数逻辑可知,加锁耗时不可能超过 6 秒(超过就会判定为加锁失败)。我们假设某次加锁耗时比 6 秒小那么一丢丢(也就是近似 6 秒),接下来什么时候发起第一次刷新才能保证 Redis 那边的 key 不过期呢?极端情况下必须立即刷新(如果考虑刷新时的网络时延,就算立即刷新也不一定能保证)。

不过我们考虑的是一般情况。我们可以认为 6 秒耗时都花在网络上(Redis 本身执行时间可以忽略不计),然后再近似认为这 6 秒被来去均摊,各花 3 秒,因而当我们接收到 Redis 的响应时,该 key 在 Redis 那边的 TTL 已经用掉了一半,所以定时间隔必须小于 ttl/2,再将刷新时的网络时延考虑进去,取 ttl/3 或者 ttl/4 比较合适。

就算有了 refresh 机制,也不能说是万无一失了。

考虑 Redis 宕机或者网络不通的情况。

假设线程 A 加锁(ttl=2s)后不久 Redis 就宕机了(或者该业务服所在网络发生分区导致网络不通),宕机期间 refresh 会失败。2s 后 Redis 重启恢复正常,此时线程 A 设置的那个 key 已经过期了,其他线程就能够获取锁,如果线程 A 的执行时间超过 2s,就和其他线程产生竞态。

refresh 机制解决不了该问题,要用其他手段来保证 Redis 和锁的高可用性,如 Redis 集群、官方提供的 Redlock 方案等。

可重入性

一些语言(如 java)内置可重入锁,一些语言(如 go)则不支持。

我们通过代码说下可重入锁是什么:

var lock = newLock();  // 在同一个线程中, foo() 调 bar() // 函数 foo() 和 bar() 都在竞争同一把锁  function foo() {     lock.lock();     ...     bar();     ...     lock.unlock(); }  function bar() {     lock.lock();     // do something     lock.unlock(); } 

如上,同一个线程中 foo() 调 bar(),由于 foo() 调 bar() 之前加了锁,因而 bar() 中再竞争该锁时就会一直等待,导致 bar() 函数执行不下去,进而导致 foo() 函数无法解锁,于是造成死锁。

如果上面的 lock 是一把可重入锁,bar() 就会加锁成功。

实现原理是:加锁的 lock() 方法中会判断当前这把锁被哪个线程持有,如果持有锁的线程和现在抢锁的线程是同一个线程,则视为抢锁成功(这锁本来就是被它持有的嘛,抢啥呢)。

由于 foo() 和 bar() 是在同一个线程中调用的,所以他俩都会加锁成功。

锁是加成功了,解锁呢?bar() 中的 unlock() 要怎么处理呢?直接把锁释放掉?不行啊,foo() 中的 unlock() 还没执行呢,bar() 虽然用完锁了,但 foo() 还没用完啊,你 bar() 三下五除二把锁给释放了,其他线程拿到锁,不就和 foo() 中代码构成竞态了吗?

所以可重入锁采用信号量的思想,在内部维持了两个属性:threadid 表示哪个线程持有锁;lockNum 表示持有线程加了几次锁。同一个线程,每 lock() 一次 lockNum 加 1,每 unlock() 一次 lockNum 减 1,只有 lockNum 变成 0 了才表示这把锁真正释放了,其他线程才能用。

原理讲完了,但你不觉得上面的代码很怪吗?

既然 foo() 已经加锁了,bar() 为何还要加同一把锁呢?

在某些情况下这样做可能是有原因的,但大多数情况下,这个问题可以从设计上解决,而不是非要引入可重入锁。

比如我们可以将 bar() 声明为非线程安全的,将加锁工作交给调用者,同时限制 bar() 的可见域,防止其被滥用。

go 语言不支持可重入锁的理由就是:当你的代码需要用可重入锁了,你首先要做的是审视你的设计是否有问题。

可重入锁的便捷性可能会带来代码设计上的问题。

所以本篇并不打算去实现可重入能力——虽然实现起来并不难,无非是将上面讲的原理在 Redis 上用 Lua 脚本实现一遍而已。

不是银弹

有了锤子,全世界都是钉子。

分布式锁看似是颗银弹,但有些问题用其他方案会比分布式锁要好。

我们看看秒杀扣库存的例子。

网上很多讲分布式锁的文章都拿秒杀扣库存来举例。

秒杀场景为了应对高并发,一般会将秒杀商品库存提前写入到 Redis 中,我们假设就用字符串类型存商品库存:

// Redis 命令,设置商品 id=1234 的库存 100 件 set seckill.stock.1234 100 

另外一个用户只能参加一次秒杀,所以扣库存前需要判断该用户是否已经参加了(防止羊毛党薅羊毛)。

扣库存逻辑是这样的:

var stockKey = "seckill.stock.1234"; var userKey = "seckill.ordered.users"; var lock = new Lock(redis, "seckill");  // 此处省略活动时间的判断  try {     // 加分布式锁     lock.lock(); 	     // 判断库存     var stockNum = redis.get(stockKey);     if (stockNum <= 0) {         // 库存不足         return false;     } 	     // 判断用户是否已经参加过     if (redis.sismember(userKey, userId)) {         return false;     } 	     // 扣库存     if (redis.decr(stockKey) >= 0) {         // 下单 	...     } else {         return false;     } 	     // 将用户加入到已参加集合中     redis.sadd(userKey, userId);     return true; } catch (Exception e) {     // 异常 } finally {     // 解锁     lock.unlock(); } 

以上逻辑为何要用分布式锁呢?

假设不用分布式锁,羊毛党同时发了十个请求(同一个用户),由于 redis.sismember(userKey, userId) 判断都会返回 0,于是都能扣库存下单,羊毛薅了一地。

但该场景有没有更优的解决方案呢?

我们使用分布式锁是为了保证临界区代码(lock 保护的区域)执行的原子性——不过 Redis 的原子性还可以通过 Lua 脚本来实现吧。

上面代码一共进行了 6 次 Redis 交互,假设每次用时 50ms,光 Redis 交互这块就用了 0.3s 的时间。

如果我们将这些逻辑封装成 Lua 脚本,只需要一次 Redis 交互就能保证原子性:

var lua = "     -- 参数说明:     --	KEYS[1]: actKey     --	KEYS[2]: userKey     --	KEYS[3]: stockKey     --	ARGV[1]: userId      -- 判断活动时间     -- (事先将活动的关键信息保存到 Redis hash 中)     -- 取活动的开始和结束时间     local act = redis.call('hmget', KEYS[1], 'start', 'end');     local now = redis.call('time')[1];     if (not act[1] or now < act[1] or now >= act[2])     then         return 0;     end 	     -- 判断库存     local stock = redis.call('get', KEYS[3]);     if (not stock or tonumber(stock) <= 0)     then         return 0;     end 	     -- 判断用户是否已经参与过     if (redis.call('sismember', KEYS[2], ARGV[1]) == 1)     then         return 0;     end 	     -- 扣库存     if (redis.call('decr', KEYS[3]) >= 0)     then         -- 加入用户         return redis.call('sadd', KEYS[2], ARGV[1]);     else         return 0;     end ";  var actKey = "seckill.act."+actId; var userKey = actKey + ".users"; var stockKey = actKey + ".stock." + goodsId;  if (redis.eval(lua, 3, actKey, userKey, stockKey, userId)) {     // 扣库存成功,下单     ... } 

上面的脚本还可以先缓存到 Redis 服务器中,然后用 evalsha 命令执行,这样客户端就不用每次都传这么一大坨代码,进一步提升传输性能。

总结

本篇我们从 setnx 命令开始实现了一个最简单的分布式锁,而后通过实际使用发现其存在各种缺陷并逐步增强其实现,主要涉及到以下几个方面:

  1. 被动释放。进程崩溃后,进程本地锁自然会销毁,但 Redis 锁不会。所以要加 TTL 机制,防止因加锁者崩溃而导致锁无法释放;
  2. 属主。线程不能释放别的线程的锁;
  3. 锁等待。加锁失败时可以等待一段时间并重试,而不是立即返回;
  4. 保活。通过定时刷新锁的 TTL 防止被动释放;

不难发现,分布式锁比进程内本地锁要复杂得多,也重得多(本地锁操作是纳秒级别,分布式锁操作是毫秒级别),现实中,在使用分布式锁之前我们要思考下有没有其它更优方案,比如乐观锁、Lua 脚本等。

另外需要注意的是,分布式锁只能解决多进程之间的并发问题,并不能实现数据操作的幂等性。一个例子是增减积分的操作。

增加积分的例子:

// 给用户增加积分 // sourceType、sourceId:积分来源标识,如消费赠送积分场景的 orderCode // 幂等性:同样的 userId-sourceType-sourceId 不能重复加积分 function addBonus(userId, sourceType, sourceId, bonus) {     // 加分布式锁     var lock = new Lock(...); 	     try {         if (!lock.lock()) {             return false;         } 		         // 检查是否重复         if (isRepeat(userId, sourceType, sourceId)) {             return false;         } 		         // 加积分         add(userId, sourceType, sourceId, bonus);     } finally {         lock.unlock();     } } 

上面分布式锁的作用是防止并发请求(调用端 bug?薅羊毛?),而该操作的幂等性是由 isRepeat() 保证的(查数据库)。

保障幂等性一般有悲观锁和乐观锁两种模式。

上面这种属于悲观锁模式(把整个操作锁起来),另一种乐观锁实现方式是给 userId-sourceType-sourceId 加上组合唯一键约束,此时就不需要加分布式锁,也不需要 isRepeat() 检测,直接 add(userId, sourceType, sourceId, bonus) 就能搞定。

最后说下文中为啥使用伪代码(而不是用具体某一门编程语言实现)。

用伪代码的最主要目的是省去语言特定的实现细节,将关注点放在逻辑本身。

比如 redis 客户端,不同语言有不同的使用方式,就算同一门语言的不同类库用法也不同,有些语言的类库用起来又臭又长,影响心情。

伪代码不受特定语言约束,用起来自由自在,本文中 redis 客户端的使用方式和 Redis 官方的原始命令格式完全一致,没有额外的心智负担。

再比如生成 token 的随机字符串函数 randStr(),go 语言要这样写:

func randStr(size int) (string, error) {     sl := make([]byte, size)     if _, err := io.ReadFull(rand.Reader, sl); err != nil {         return "", err     }     return base64.RawURLEncoding.EncodeToString(sl), nil } 

代码虽然不多,但没玩过 go 的小伙伴看到这儿心里是不是要起伏那么两三下?但这玩意怎么实现跟本文的主题没半毛钱关系。

相反,本文的 lua 脚本都是货真价实的,测试通过的——因为这是本文的核心啊。

伪代码的缺点是它不能“拎包入住”,但本文的重点并不是要写个源码库——我们没必要真的自己写一个,直接用 redission 或者其他什么库不香吗?

本文的重点在于分析 Redis 分布式锁的原理,分布式锁面临哪些问题?解决思路是什么?使用时要注意什么?知其然知其所以然。

当你不知其所以然时,很多东西显得特高大上,什么“看门狗”,搞得神乎其神,当搞明白其原理和目的时,也就那么回事。

循序渐进 Redis 分布式锁(以及何时不用它)

发表评论

相关文章