高并发环境下生成序列编码重复问题分析

一、背景

有个业务系统(订单系统),通过后台日志和监控观察,系统偶尔会出现重复唯一索引问题,例如:后台日志片段

 Duplicate entry 'service_no'  for key 'idx_service_no' ....

也就是说写入数据与数据库已有数据发生重复。

下面我们分析一下问题出现在哪里:

这个字段就是业务编码 :service_no

这个取号规则是:要固定长度13位数,由首写大写字母 F+年月日(201201)+6 位有序数字组成。

6位有序数字,是当天数据库增量已有数据最大编码序列依次累加。

首先说明一下:每天6位数字(最高是999999)一天最多99万个序列号绝对够当前业务使用的,(实际上一天最多也就几万个单号),所以号量是满足业务需求的。

业务规则没有问题,那说明是程序代码逻辑有问题了,我们来看一下代码情况:

 

二、分析代码逻辑结构

 

1、首先取号

获取生成下一个序列号取号方法如下:

 

/** 	 * 创建编码内部自带加锁 	 * @param rule 编码规则参数 	 * @param params 参数数组 	 * @return 取号结果字符串 	 */    @Transactional 	public String create(String rule, String... params) { 		String code = ""; 		String lockName = rule; 		if (params.length > 0 && StringUtils.isNotEmpty(params[0])) { 			lockName = rule + ":" + params[0]; 		} 		if (params.length > 1) { 			lockName = rule + ":" + params[0] + ":" + params[1]; 		}  		try {             //加jedis客户端工具分布式锁 			RedisLocker.lock(lockName, 10); 			code = getNext(rule, params); 		} finally {             //释放锁 			RedisLocker.unlock(lockName); 		} 		return code; 	}  	/** 	 * 创建序列编码,无事务控制,需要依赖外层加redis锁! 	 * @param rule 生成规则 	 * @param params 参数列表 	 * @return 取号结果字符串 	 */ 	private String getNext(String rule, String... params) { 		String[] rules = rule.split("-"); 		CodeCondition condition = new CodeCondition(); 		Code code = new Code(); 		String prefix = ""; 		String num = ""; 		String digit = ""; 		int i = 0; 		for (String str : rules) { 			if (str.equals("r")) { 				// 类型 				code.setType(params[i]); 				condition.setType(params[i]); 				prefix += params[i]; 				i++; 			} else if (str.equals("c")) { 				// 商家编码 				code.setShopCode(params[i]); 				condition.setShopCode(params[i]); 				prefix += params[i]; 				i++; 			} else if (str.contains("yy") || str.contains("MM")) { 				// 日期 				//SimpleDateFormat formatter = new SimpleDateFormat(str); 				String t = DateUtils.getCurrentDate(str); 				code.setTime(t); 				prefix += t; 			} else if (str.contains("N")) { 				// 数字 				digit = str.substring(1); 			} else { 				// 其它 				prefix += str; 			} 		} 		code.setPrefix(prefix); 		condition.setPrefix(prefix); 		if (digit.length() > 0) { 			int n = 1; 			Integer serialNumber = codeDao.findNewOneByManual(condition); 			if (serialNumber != null) { 				n = serialNumber + 1; 			} 			code.setSerialNumber(n); 			num = String.format("%0" + digit + "d", n); 		} 		code.setCode(prefix + num); 		// 添加3次重试机制  		boolean success = codeDao.getNextCode(code); 		if (success) { 			return prefix + num; 		} 		return null; 	}

 

2、数据操作层事务

提交到DAO层准备交给执行JDBC去执行提交到数据库,主要使用了手动控制事务提交代码如下:

 

/** 	 * 手动控制事务 	 * @param param 提交新的对象值更新 	 * @return 	 */ 	public boolean getNextCode(final Code param){         //手动控制事务 		transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); 		transactionTemplate.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED); 		return transactionTemplate.execute(new TransactionCallback<Boolean>() { 			public Boolean doInTransaction(final TransactionStatus status) { 				try { 					int i = 0; 					CodeCondition con = new CodeCondition(); 					con.setType(param.getType()); 					con.setTime(param.getTime()); 					con.setShopCode(param.getShopCode());                     //根据入参条件查询是否存在编码对象数据 					Code c = findNewOne(con); 					if(c == null){ 						i = jdbcTemplate.update("INSERT INTO xx_code(id,code,type,time,shop_code,serial_number,prefix,remarks) VALUES(?,?,?,?,?,?,?,?)", 								UUID.randomUUID().toString().replaceAll("-", ""), 								param.getCode(), 								param.getType(), 								param.getTime(), 								param.getShopCode(), 								param.getSerialNumber(), 								param.getPrefix(), 								param.getRemarks()); 					} else { 						List<Object> params = new ArrayList<Object>(); 						params.add(param.getCode()); 						params.add(param.getPrefix()); 						params.add(param.getSerialNumber()); 						params.add(c.getTime()); 						params.add(c.getId()); 						//使用旧值做匹配条件 						String sql = "UPDATE xx_code SET code=?, prefix=?,serial_number=?,time=?  WHERE id=? "; 						i = jdbcTemplate.update(sql, params.toArray()); 					} 					return i > 0; 				} catch (Exception ex) { 					 status.setRollbackOnly(); 					logger.error(ex.getMessage(),ex); 				} 				return false; 			} 		}); 	}

 

这里备注说明一下几个事务属于参数:

PROPAGATION_REQUIRED-- 支持当前事务,如果当前没有事务,就新建一个事务。这是最常见的选择。 

假如当前正要执行的事务不在另外一个事务里,那么就起一个新的事务 。

ServiceA {           
     void methodA() {  
         ServiceB.methodB();  
     }  
}      
ServiceB {           
     void methodB() {  
     }           
}  
 比如说,ServiceB.methodB的事务级别定义为PROPAGATION_REQUIRED, 那么由于执行ServiceA.methodA的时候
  1、如果ServiceA.methodA已经起了事务,这时调用ServiceB.methodB,ServiceB.methodB看到自己已经运行在ServiceA.methodA的事务内部,就不再起新的事务。这时只有外部事务并且他们是共用的,所以这时ServiceA.methodA或者ServiceB.methodB无论哪个发生异常methodA和methodB作为一个整体都将一起回滚。
  2、如果ServiceA.methodA没有事务,ServiceB.methodB就会为自己分配一个事务。这样,在ServiceA.methodA中是没有事务控制的。只是在ServiceB.methodB内的任何地方出现异常,ServiceB.methodB将会被回滚,不会引起ServiceA.methodA的回滚。

 在 spring的 TransactionDefinition接口中一共定义了六种事务传播属性:
 PROPAGATION_REQUIRED -- 支持当前事务,如果当前没有事务,就新建一个事务。这是最常见的选择。 
PROPAGATION_SUPPORTS -- 支持当前事务,如果当前没有事务,就以非事务方式执行。 
PROPAGATION_MANDATORY -- 支持当前事务,如果当前没有事务,就抛出异常。 
PROPAGATION_REQUIRES_NEW -- 新建事务,如果当前存在事务,把当前事务挂起。 
PROPAGATION_NOT_SUPPORTED -- 以非事务方式执行操作,如果当前存在事务,就把当前事务挂起。 
PROPAGATION_NEVER -- 以非事务方式执行,如果当前存在事务,则抛出异常。 
PROPAGATION_NESTED -- 如果当前存在事务,则在嵌套事务内执行。如果当前没有事务,则进行与PROPAGATION_REQUIRED类似的操作。 
前六个策略类似于EJB CMT,第七个(PROPAGATION_NESTED)是Spring所提供的一个特殊变量。 
它要求事务管理器或者使用JDBC 3.0 Savepoint API提供嵌套事务行为(如Spring的DataSourceTransactionManager)。

 

3、分布式锁工具

使用jedis工具包作为分布式锁代码如下:

 

/** 	 * 在指定时间内等待获取锁 	 * @param waitTime 等待锁的时间 	 *  	 */ 	public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException { 		// 系统当前时间,以毫微秒为单位。 		long nano = System.nanoTime();  		do { 			if(tryLock()){ 				logger.debug(this.lockValue + "获取锁"); 				return Boolean.TRUE; 			} 			Thread.sleep(new Random().nextInt(100) + 1); 		} while ((System.nanoTime() - nano) < unit.toNanos(waitTime));  		return Boolean.FALSE; 	} 	 	/** 	 * 阻塞式加锁 	 */ 	public void lock() { 		while(!tryLock()){ 			try { 				// 睡眠,降低抢锁频率,缓解redis压力 				Thread.sleep(new Random().nextInt(100) + 1);  			} catch (InterruptedException e) { 				e.printStackTrace(); 			} 		} 	} 	 	/** 	 * 获取锁的线程解锁 	 * 不足:当时阿里云redis集群版本暂不支持执行lua脚本eval函数。 	 * 参考:https://help.aliyun.com/document_detail/26356.html?spm=5176.11065259.1996646101.searchclickresult.3dd24026cWCgRN 	 *  	 */ 	public void unlock() { 		// 检查当前线程是否持有锁 		if (this.lockValue.equals(jedis.get(this.lockName))) { 			logger.debug(this.lockValue + "释放锁"); 			try { 				jedis.del(this.lockName); 			} finally { 				// Jedis 客户端版本是使用 Jedis-2.7.2版本;如果是2.9以上本的版注意这里不是关闭连接,在JedisPool模式下,Jedis会被归还给资源池。 				if (jedis != null) { 					jedis.close(); 				} 			} 		} else { 			logger.debug(Thread.currentThread().getName() + "并非持有锁的线程,未能解锁"); 		} 	}

 

上面展示代码展示分布式销和事务一些使用,但请注意这里有坑!!!

 

4、存在哪些坑?

坑1Jedis客户端版本要注意一下,如果是3.0.1及以上的话 jedis.close();已经被重写,请看客方源代码。

2.9 版本以前连接池使用有returnResource接口方法,3.0.1之后版本被去掉了。

官方重写了close方法,jedis.close不能直接调用。

try {
    jedis = pool.getResource();
} finally {
if (jedis != null) {
    jedis.close();
    }
}

某一次升级了jedis client版本还导致生产环境redis服务被打暴,引发重大事故,所以如果使用jedis client建议使用2.9以下版本还靠谱一些。

其实这种写法,还有一些问题的,需要进一步改进。如何改进?我们往下一步分析。

改进版本1(使用lua脚本替代):

 /**      * 尝试获取分布式锁      * @param jedis Redis客户端      * @param lockKey 锁      * @param requestId 请求标识      * @param expireTime 超期时间      * @return 是否获取成功      */     public static boolean tryLock(Jedis jedis, String lockKey, String requestId, long expireTime) {          //jedis 3.0之后写法         /*   SetParams params = new SetParams();         params.px(expireTime);         params.nx();*/         String result = jedis.set(lockKey,  requestId,"NX", "PX", expireTime);          if (LOCK_SUCCESS.equals(result)) {             return true;         }         return false;      }      /**      * 释放分布式锁(LUA脚本实现)      * @param jedis Redis客户端      * @param lockKey 锁      * @param requestId 请求标识      * @return      */     public static boolean unLock(Jedis jedis, String lockKey, String requestId) {          String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return " +                 "0 end";         Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));         if (RELEASE_SUCCESS.equals(result)) {             return true;         }         if (jedis != null) {             jedis.close();         }         return false;     }

 

 当时阿里云redis集群版本暂不支持执行lua脚本eval函数。所以当时lua脚本方式的无发挥之地!
 参考:https://help.aliyun.com/document_detail/26356.html?spm=5176.11065259.1996646101.searchclickresult.3dd24026cWCgRN

 

改进版本2(使用redisson替代):

/**  * 利用redisson client 实现分布式锁  * @author cgli  */ public final class RedisLocker {  	private final static String LOCKER_PREFIX = "lock:";  	private static final Logger logger = LoggerFactory.getLogger(RedisLocker.class);  	private RedisLocker() {      }  	/** 	 * 获取连接配置实例 	 * @return 	 */ 	private static RedissonClient getClient() { 		return SingletonHolder.client; 	}  	/**      * 根据name对进行上锁操作,redissonLock 阻塞式的,采用的机制发布/订阅 	 * timeout结束强制解锁,防止死锁 :1分钟      * @param lockName 锁名称      */     public static void lock(String lockName){        lock(lockName,60);     }      /** 	 * 根据name对进行上锁操作采用redisson RLOCK加锁方式 	 * @param lockName 锁名称 	 * @param leaseTime 结束强制解锁,防止死锁 :单位秒 	 */ 	public static void lock(String lockName,long leaseTime){ 		String key = LOCKER_PREFIX + lockName; 		RLock lock = getClient().getLock(key); 		//lock提供带timeout参数,timeout结束强制解锁,防止死锁 :1分钟 		lock.lock(leaseTime, TimeUnit.SECONDS); 	}      /**      * 根据name对进行解锁操作      * @param lockName      */     public static void unlock(String lockName){ 		String key = LOCKER_PREFIX + lockName; 		RLock lock = getClient().getLock(key); 		if(lock.isLocked()){ 			if(lock.isHeldByCurrentThread()){ 				lock.unlock(); 			} 		}     }  	/** 	 * 	 * @param resourceName 锁的KEY名称 	 * @param worker 回调外部工作任务 	 * @param lockTime 锁定超时时间,默认acquireTimeout=100second 获取锁的超时时间 	 * @param <T> 	 * @return 	 * @throws Exception 	 */ 	public static <T> T lock(String resourceName, AquiredLockWorker<T> worker, 			long lockTime) throws Exception { 		return lock(resourceName, worker, 100, lockTime); 	}  	/** 	 * 	 * @param resourceName 锁的KEY名称 	 * @param worker 回调外部工作任务 	 * @param acquireTimeout 获取锁的超时时间 	 * @param lockTime 锁定超时时间 	 * @param <T> 	 * @return 	 * @throws Exception 	 */ 	public static <T> T lock(String resourceName, AquiredLockWorker<T> worker, long acquireTimeout, 							 long lockTime) throws Exception { 		RLock lock = getClient().getLock(LOCKER_PREFIX + resourceName);  		// Acquire lock and release it automatically after 10 seconds 		// if unlock method hasn't been invoked 		//lock.lock(10, TimeUnit.SECONDS);  		try { 			// Wait for acquireTimeout seconds and automatically unlock it after lockTime seconds 			boolean res = lock.tryLock(acquireTimeout, lockTime, TimeUnit.SECONDS); 			if (res) { 				return worker.execute(); 			} 		} finally { 			if (lock != null) { 				lock.unlock(); 			} 		} 		return null; 	}  	/** 	 * 内部类实现单例模式 	 */ 	static class SingletonHolder { 		private static RedissonClient client = init();  		private static RedissonClient init() { 			RedisProperties properties = ApplicationContextHolder.getApplicationContext().getBean(RedisProperties.class); 			String host = properties.getRedisHost(); 			int port = Integer.parseInt(properties.getRedisPort()); 			String password = properties.getRedisPassword(); 			if (StringUtils.isEmpty(password)) { 				password = null; 			} 			int database = Integer.parseInt(properties.getRedisDataBase()); 			try { 				Config config = new Config(); 				config.useSingleServer() 						.setAddress("redis://" + host + ":" + port) 						.setPassword(password) 						.setDatabase(database) 						//同任何节点建立连接时的等待超时。时间单位是毫秒。默认:10000 						.setConnectTimeout(30000) 						//当与某个节点的连接断开时,等待与其重新建立连接的时间间隔。时间单位是毫秒。默认:3000 						//等待节点回复命令的时间。该时间从命令发送成功时开始计时。默认:3000 						.setTimeout(10000) 						//如果尝试达到 retryAttempts(命令失败重试次数) 仍然不能将命令发送至某个指定的节点时,将抛出错误。如果尝试在此限制之内发送成功,则开始启用 timeout(命令等待超时) 						// 计时。默认值:3 						.setRetryAttempts(5) 						//在一条命令发送失败以后,等待重试发送的时间间隔。时间单位是毫秒。     默认值:1500 						.setRetryInterval(3000); 				return Redisson.create(config); 			} catch (Exception e) { 				logger.error(e.getMessage(), e); 			} 			return null; 		} 	}  }

 

 坑2:使用@Transactional 注解嵌套事务的问题,会导致一些事务混乱,达不到最终的数据效果。

 回去最开始代码处

    @Transactional  //这里加了事务注解
    public String create(String rule, String... params) {

     try {
            //加jedis客户端工具分布式锁
            RedisLocker.lock(lockName, 10);
            code = getNext(rule, params);
        } finally {
            //释放锁
            RedisLocker.unlock(lockName);
        }

  }

 这种结果达不到预期的结果,就是会出现有时会产生重复的编码,如本文提出的问题。这是为什么呢?

由于spring的AOP机制,会在update/save方法之前开启事务,在这之后再加锁,当锁住的代码执行完成后,再提交事务,因此锁代码块执行是在事务之内执行的。

可以推断在代码块执行完时,事务还未提交,这时如果其他线程进入锁代码块后,读取的库存数据就不是最新的,就可能产生了不是你想要的结果数据。

高并发环境下生成序列编码重复问题分析

 

这个问题我们验证测试一下,写一个测试用例

高并发环境下生成序列编码重复问题分析

用jmeter压力测试跑一下,取序列号的结果

 

高并发环境下生成序列编码重复问题分析

 高并发环境下生成序列编码重复问题分析

在日志分析中,经常是有两个线程查到上一个相同的序列号,拿到的不是更新之后的数据结果,如下图:

高并发环境下生成序列编码重复问题分析

 

产生重复编码,说明jedis分布式锁并没有真实锁住。问题就是出现在最外层的 @Transactional注解上,最外层调用代码完全没有必要加了@Transactional。

因为最内层已经加了手动控制事务的控制。拿掉外层的@Transactional再跑压力测试一切正常。

高并发环境下生成序列编码重复问题分析

 

、分析代码逻辑结构

 

1、Redis java客户端不推荐使用jedis,特别是2.9以上的版本,代码没有处理好很容易搞把redis服务搞死(连接数打满),推荐使用redisson代替,性能高且内置实现连接池。

如果真的一定要用jedis使用2.9以下版本并使用 lua脚本来控制,才能实现真正原子性操作。

2、事务注解@Transactional事务控制,嵌套使用时要注意,尽量控制在最小单元的最内层使用,在最外层(大方法)使用有风险,特别是跟锁一起使用时要注意控制两者顺序。

另外@Transactional在分布式环境下,远程调用无效的,并不能当作分布式事务来对待,这个业内有成熟其他方案替代。

3、其实取号生成序列号服务,没有必要使用数据库当作序列计数,完全可以使用redis计数器做实现(内部版本2年前就用redis版本来取号,连续运行两2年多没有发现有啥问题)。

示例代码如下:

 private String generateByRedis(BusinessCodeCondition condition) {         String time = "";         String prefix = "";         String type = condition.getType();         StringBuilder sb = new StringBuilder();         sb.append(type);         if (StringUtils.isNotEmpty(condition.getShopCode())) {             sb.append(condition.getShopCode());         }         if (StringUtils.isNotEmpty(condition.getDateFormat())) {             time = DateFormatUtil.formatDate(condition.getDateFormat(), new Date());             sb.append(time);         }         prefix = sb.toString();         sb.setLength(0);         String key = KEY_PREFIX + prefix;         //long n = redisTemplate.opsForValue().increment(key, 1L);         //redisTemplate.expire(key, EXPIRE_TIME, TimeUnit.DAYS);         long n = getIncrementNum(key, condition.getDateFormat());         return String.format("%s%0" + condition.getDigitCount() + "d", prefix, n);     }      //使用redis计数器取序列,每天过期前一天的key值回收。     private Long getIncrementNum(String key, String dateFormat) {         RedisAtomicLong entityIdCounter = new RedisAtomicLong(key, redisTemplate.getConnectionFactory());         Long counter = entityIdCounter.incrementAndGet();         if ((null == counter || counter.longValue() == 1)) {             if (StringUtils.isNotEmpty(dateFormat) && dateFormat.indexOf("yyMMdd") > -1) {                 entityIdCounter.expire(EXPIRE_TIME, TimeUnit.DAYS);             }         }         return counter;     }

 

发表评论

相关文章

当前内容话题