一、背景
有个业务系统(订单系统),通过后台日志和监控观察,系统偶尔会出现重复唯一索引问题,例如:后台日志片段
Duplicate entry 'service_no' for key 'idx_service_no' ....
也就是说写入数据与数据库已有数据发生重复。
下面我们分析一下问题出现在哪里:
这个字段就是业务编码 :service_no
这个取号规则是:要固定长度13位数,由首写大写字母 F+年月日(201201)+6 位有序数字组成。
6位有序数字,是当天数据库增量已有数据最大编码序列依次累加。
首先说明一下:每天6位数字(最高是999999)一天最多99万个序列号绝对够当前业务使用的,(实际上一天最多也就几万个单号),所以号量是满足业务需求的。
业务规则没有问题,那说明是程序代码逻辑有问题了,我们来看一下代码情况:
二、分析代码逻辑结构
1、首先取号
获取生成下一个序列号取号方法如下:
/** * 创建编码内部自带加锁 * @param rule 编码规则参数 * @param params 参数数组 * @return 取号结果字符串 */ @Transactional public String create(String rule, String... params) { String code = ""; String lockName = rule; if (params.length > 0 && StringUtils.isNotEmpty(params[0])) { lockName = rule + ":" + params[0]; } if (params.length > 1) { lockName = rule + ":" + params[0] + ":" + params[1]; } try { //加jedis客户端工具分布式锁 RedisLocker.lock(lockName, 10); code = getNext(rule, params); } finally { //释放锁 RedisLocker.unlock(lockName); } return code; } /** * 创建序列编码,无事务控制,需要依赖外层加redis锁! * @param rule 生成规则 * @param params 参数列表 * @return 取号结果字符串 */ private String getNext(String rule, String... params) { String[] rules = rule.split("-"); CodeCondition condition = new CodeCondition(); Code code = new Code(); String prefix = ""; String num = ""; String digit = ""; int i = 0; for (String str : rules) { if (str.equals("r")) { // 类型 code.setType(params[i]); condition.setType(params[i]); prefix += params[i]; i++; } else if (str.equals("c")) { // 商家编码 code.setShopCode(params[i]); condition.setShopCode(params[i]); prefix += params[i]; i++; } else if (str.contains("yy") || str.contains("MM")) { // 日期 //SimpleDateFormat formatter = new SimpleDateFormat(str); String t = DateUtils.getCurrentDate(str); code.setTime(t); prefix += t; } else if (str.contains("N")) { // 数字 digit = str.substring(1); } else { // 其它 prefix += str; } } code.setPrefix(prefix); condition.setPrefix(prefix); if (digit.length() > 0) { int n = 1; Integer serialNumber = codeDao.findNewOneByManual(condition); if (serialNumber != null) { n = serialNumber + 1; } code.setSerialNumber(n); num = String.format("%0" + digit + "d", n); } code.setCode(prefix + num); // 添加3次重试机制 boolean success = codeDao.getNextCode(code); if (success) { return prefix + num; } return null; }
2、数据操作层事务
提交到DAO层准备交给执行JDBC去执行提交到数据库,主要使用了手动控制事务提交代码如下:
/** * 手动控制事务 * @param param 提交新的对象值更新 * @return */ public boolean getNextCode(final Code param){ //手动控制事务 transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); transactionTemplate.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED); return transactionTemplate.execute(new TransactionCallback<Boolean>() { public Boolean doInTransaction(final TransactionStatus status) { try { int i = 0; CodeCondition con = new CodeCondition(); con.setType(param.getType()); con.setTime(param.getTime()); con.setShopCode(param.getShopCode()); //根据入参条件查询是否存在编码对象数据 Code c = findNewOne(con); if(c == null){ i = jdbcTemplate.update("INSERT INTO xx_code(id,code,type,time,shop_code,serial_number,prefix,remarks) VALUES(?,?,?,?,?,?,?,?)", UUID.randomUUID().toString().replaceAll("-", ""), param.getCode(), param.getType(), param.getTime(), param.getShopCode(), param.getSerialNumber(), param.getPrefix(), param.getRemarks()); } else { List<Object> params = new ArrayList<Object>(); params.add(param.getCode()); params.add(param.getPrefix()); params.add(param.getSerialNumber()); params.add(c.getTime()); params.add(c.getId()); //使用旧值做匹配条件 String sql = "UPDATE xx_code SET code=?, prefix=?,serial_number=?,time=? WHERE id=? "; i = jdbcTemplate.update(sql, params.toArray()); } return i > 0; } catch (Exception ex) { status.setRollbackOnly(); logger.error(ex.getMessage(),ex); } return false; } }); }
这里备注说明一下几个事务属于参数:
PROPAGATION_REQUIRED-- 支持当前事务,如果当前没有事务,就新建一个事务。这是最常见的选择。
假如当前正要执行的事务不在另外一个事务里,那么就起一个新的事务 。
ServiceA {
void methodA() {
ServiceB.methodB();
}
}
ServiceB {
void methodB() {
}
}
比如说,ServiceB.methodB的事务级别定义为PROPAGATION_REQUIRED, 那么由于执行ServiceA.methodA的时候
1、如果ServiceA.methodA已经起了事务,这时调用ServiceB.methodB,ServiceB.methodB看到自己已经运行在ServiceA.methodA的事务内部,就不再起新的事务。这时只有外部事务并且他们是共用的,所以这时ServiceA.methodA或者ServiceB.methodB无论哪个发生异常methodA和methodB作为一个整体都将一起回滚。
2、如果ServiceA.methodA没有事务,ServiceB.methodB就会为自己分配一个事务。这样,在ServiceA.methodA中是没有事务控制的。只是在ServiceB.methodB内的任何地方出现异常,ServiceB.methodB将会被回滚,不会引起ServiceA.methodA的回滚。
在 spring的 TransactionDefinition接口中一共定义了六种事务传播属性:
PROPAGATION_REQUIRED -- 支持当前事务,如果当前没有事务,就新建一个事务。这是最常见的选择。
PROPAGATION_SUPPORTS -- 支持当前事务,如果当前没有事务,就以非事务方式执行。
PROPAGATION_MANDATORY -- 支持当前事务,如果当前没有事务,就抛出异常。
PROPAGATION_REQUIRES_NEW -- 新建事务,如果当前存在事务,把当前事务挂起。
PROPAGATION_NOT_SUPPORTED -- 以非事务方式执行操作,如果当前存在事务,就把当前事务挂起。
PROPAGATION_NEVER -- 以非事务方式执行,如果当前存在事务,则抛出异常。
PROPAGATION_NESTED -- 如果当前存在事务,则在嵌套事务内执行。如果当前没有事务,则进行与PROPAGATION_REQUIRED类似的操作。
前六个策略类似于EJB CMT,第七个(PROPAGATION_NESTED)是Spring所提供的一个特殊变量。
它要求事务管理器或者使用JDBC 3.0 Savepoint API提供嵌套事务行为(如Spring的DataSourceTransactionManager)。
3、分布式锁工具
使用jedis工具包作为分布式锁代码如下:
/** * 在指定时间内等待获取锁 * @param waitTime 等待锁的时间 * */ public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException { // 系统当前时间,以毫微秒为单位。 long nano = System.nanoTime(); do { if(tryLock()){ logger.debug(this.lockValue + "获取锁"); return Boolean.TRUE; } Thread.sleep(new Random().nextInt(100) + 1); } while ((System.nanoTime() - nano) < unit.toNanos(waitTime)); return Boolean.FALSE; } /** * 阻塞式加锁 */ public void lock() { while(!tryLock()){ try { // 睡眠,降低抢锁频率,缓解redis压力 Thread.sleep(new Random().nextInt(100) + 1); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 获取锁的线程解锁 * 不足:当时阿里云redis集群版本暂不支持执行lua脚本eval函数。 * 参考:https://help.aliyun.com/document_detail/26356.html?spm=5176.11065259.1996646101.searchclickresult.3dd24026cWCgRN * */ public void unlock() { // 检查当前线程是否持有锁 if (this.lockValue.equals(jedis.get(this.lockName))) { logger.debug(this.lockValue + "释放锁"); try { jedis.del(this.lockName); } finally { // Jedis 客户端版本是使用 Jedis-2.7.2版本;如果是2.9以上本的版注意这里不是关闭连接,在JedisPool模式下,Jedis会被归还给资源池。 if (jedis != null) { jedis.close(); } } } else { logger.debug(Thread.currentThread().getName() + "并非持有锁的线程,未能解锁"); } }
上面展示代码展示分布式销和事务一些使用,但请注意这里有坑!!!
4、存在哪些坑?
坑1:Jedis客户端版本要注意一下,如果是3.0.1及以上的话 jedis.close();已经被重写,请看客方源代码。
2.9 版本以前连接池使用有returnResource接口方法,3.0.1之后版本被去掉了。
官方重写了close方法,jedis.close不能直接调用。
try {
jedis = pool.getResource();
} finally {
if (jedis != null) {
jedis.close();
}
}
某一次升级了jedis client版本还导致生产环境redis服务被打暴,引发重大事故,所以如果使用jedis client建议使用2.9以下版本还靠谱一些。
其实这种写法,还有一些问题的,需要进一步改进。如何改进?我们往下一步分析。
改进版本1(使用lua脚本替代):
/** * 尝试获取分布式锁 * @param jedis Redis客户端 * @param lockKey 锁 * @param requestId 请求标识 * @param expireTime 超期时间 * @return 是否获取成功 */ public static boolean tryLock(Jedis jedis, String lockKey, String requestId, long expireTime) { //jedis 3.0之后写法 /* SetParams params = new SetParams(); params.px(expireTime); params.nx();*/ String result = jedis.set(lockKey, requestId,"NX", "PX", expireTime); if (LOCK_SUCCESS.equals(result)) { return true; } return false; } /** * 释放分布式锁(LUA脚本实现) * @param jedis Redis客户端 * @param lockKey 锁 * @param requestId 请求标识 * @return */ public static boolean unLock(Jedis jedis, String lockKey, String requestId) { String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return " + "0 end"; Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId)); if (RELEASE_SUCCESS.equals(result)) { return true; } if (jedis != null) { jedis.close(); } return false; }
当时阿里云redis集群版本暂不支持执行lua脚本eval函数。所以当时lua脚本方式的无发挥之地!
参考:https://help.aliyun.com/document_detail/26356.html?spm=5176.11065259.1996646101.searchclickresult.3dd24026cWCgRN
改进版本2(使用redisson替代):
/** * 利用redisson client 实现分布式锁 * @author cgli */ public final class RedisLocker { private final static String LOCKER_PREFIX = "lock:"; private static final Logger logger = LoggerFactory.getLogger(RedisLocker.class); private RedisLocker() { } /** * 获取连接配置实例 * @return */ private static RedissonClient getClient() { return SingletonHolder.client; } /** * 根据name对进行上锁操作,redissonLock 阻塞式的,采用的机制发布/订阅 * timeout结束强制解锁,防止死锁 :1分钟 * @param lockName 锁名称 */ public static void lock(String lockName){ lock(lockName,60); } /** * 根据name对进行上锁操作采用redisson RLOCK加锁方式 * @param lockName 锁名称 * @param leaseTime 结束强制解锁,防止死锁 :单位秒 */ public static void lock(String lockName,long leaseTime){ String key = LOCKER_PREFIX + lockName; RLock lock = getClient().getLock(key); //lock提供带timeout参数,timeout结束强制解锁,防止死锁 :1分钟 lock.lock(leaseTime, TimeUnit.SECONDS); } /** * 根据name对进行解锁操作 * @param lockName */ public static void unlock(String lockName){ String key = LOCKER_PREFIX + lockName; RLock lock = getClient().getLock(key); if(lock.isLocked()){ if(lock.isHeldByCurrentThread()){ lock.unlock(); } } } /** * * @param resourceName 锁的KEY名称 * @param worker 回调外部工作任务 * @param lockTime 锁定超时时间,默认acquireTimeout=100second 获取锁的超时时间 * @param <T> * @return * @throws Exception */ public static <T> T lock(String resourceName, AquiredLockWorker<T> worker, long lockTime) throws Exception { return lock(resourceName, worker, 100, lockTime); } /** * * @param resourceName 锁的KEY名称 * @param worker 回调外部工作任务 * @param acquireTimeout 获取锁的超时时间 * @param lockTime 锁定超时时间 * @param <T> * @return * @throws Exception */ public static <T> T lock(String resourceName, AquiredLockWorker<T> worker, long acquireTimeout, long lockTime) throws Exception { RLock lock = getClient().getLock(LOCKER_PREFIX + resourceName); // Acquire lock and release it automatically after 10 seconds // if unlock method hasn't been invoked //lock.lock(10, TimeUnit.SECONDS); try { // Wait for acquireTimeout seconds and automatically unlock it after lockTime seconds boolean res = lock.tryLock(acquireTimeout, lockTime, TimeUnit.SECONDS); if (res) { return worker.execute(); } } finally { if (lock != null) { lock.unlock(); } } return null; } /** * 内部类实现单例模式 */ static class SingletonHolder { private static RedissonClient client = init(); private static RedissonClient init() { RedisProperties properties = ApplicationContextHolder.getApplicationContext().getBean(RedisProperties.class); String host = properties.getRedisHost(); int port = Integer.parseInt(properties.getRedisPort()); String password = properties.getRedisPassword(); if (StringUtils.isEmpty(password)) { password = null; } int database = Integer.parseInt(properties.getRedisDataBase()); try { Config config = new Config(); config.useSingleServer() .setAddress("redis://" + host + ":" + port) .setPassword(password) .setDatabase(database) //同任何节点建立连接时的等待超时。时间单位是毫秒。默认:10000 .setConnectTimeout(30000) //当与某个节点的连接断开时,等待与其重新建立连接的时间间隔。时间单位是毫秒。默认:3000 //等待节点回复命令的时间。该时间从命令发送成功时开始计时。默认:3000 .setTimeout(10000) //如果尝试达到 retryAttempts(命令失败重试次数) 仍然不能将命令发送至某个指定的节点时,将抛出错误。如果尝试在此限制之内发送成功,则开始启用 timeout(命令等待超时) // 计时。默认值:3 .setRetryAttempts(5) //在一条命令发送失败以后,等待重试发送的时间间隔。时间单位是毫秒。 默认值:1500 .setRetryInterval(3000); return Redisson.create(config); } catch (Exception e) { logger.error(e.getMessage(), e); } return null; } } }
坑2:使用@Transactional 注解嵌套事务的问题,会导致一些事务混乱,达不到最终的数据效果。
回去最开始代码处
@Transactional //这里加了事务注解
public String create(String rule, String... params) {
try {
//加jedis客户端工具分布式锁
RedisLocker.lock(lockName, 10);
code = getNext(rule, params);
} finally {
//释放锁
RedisLocker.unlock(lockName);
}
}
这种结果达不到预期的结果,就是会出现有时会产生重复的编码,如本文提出的问题。这是为什么呢?
由于spring的AOP机制,会在update/save方法之前开启事务,在这之后再加锁,当锁住的代码执行完成后,再提交事务,因此锁代码块执行是在事务之内执行的。
可以推断在代码块执行完时,事务还未提交,这时如果其他线程进入锁代码块后,读取的库存数据就不是最新的,就可能产生了不是你想要的结果数据。
这个问题我们验证测试一下,写一个测试用例
用jmeter压力测试跑一下,取序列号的结果
在日志分析中,经常是有两个线程查到上一个相同的序列号,拿到的不是更新之后的数据结果,如下图:
产生重复编码,说明jedis分布式锁并没有真实锁住。问题就是出现在最外层的 @Transactional注解上,最外层调用代码完全没有必要加了@Transactional。
因为最内层已经加了手动控制事务的控制。拿掉外层的@Transactional再跑压力测试一切正常。
三、分析代码逻辑结构
1、Redis java客户端不推荐使用jedis,特别是2.9以上的版本,代码没有处理好很容易搞把redis服务搞死(连接数打满),推荐使用redisson代替,性能高且内置实现连接池。
如果真的一定要用jedis使用2.9以下版本并使用 lua脚本来控制,才能实现真正原子性操作。
2、事务注解@Transactional事务控制,嵌套使用时要注意,尽量控制在最小单元的最内层使用,在最外层(大方法)使用有风险,特别是跟锁一起使用时要注意控制两者顺序。
另外@Transactional在分布式环境下,远程调用无效的,并不能当作分布式事务来对待,这个业内有成熟其他方案替代。
3、其实取号生成序列号服务,没有必要使用数据库当作序列计数,完全可以使用redis计数器做实现(内部版本2年前就用redis版本来取号,连续运行两2年多没有发现有啥问题)。
示例代码如下:
private String generateByRedis(BusinessCodeCondition condition) { String time = ""; String prefix = ""; String type = condition.getType(); StringBuilder sb = new StringBuilder(); sb.append(type); if (StringUtils.isNotEmpty(condition.getShopCode())) { sb.append(condition.getShopCode()); } if (StringUtils.isNotEmpty(condition.getDateFormat())) { time = DateFormatUtil.formatDate(condition.getDateFormat(), new Date()); sb.append(time); } prefix = sb.toString(); sb.setLength(0); String key = KEY_PREFIX + prefix; //long n = redisTemplate.opsForValue().increment(key, 1L); //redisTemplate.expire(key, EXPIRE_TIME, TimeUnit.DAYS); long n = getIncrementNum(key, condition.getDateFormat()); return String.format("%s%0" + condition.getDigitCount() + "d", prefix, n); } //使用redis计数器取序列,每天过期前一天的key值回收。 private Long getIncrementNum(String key, String dateFormat) { RedisAtomicLong entityIdCounter = new RedisAtomicLong(key, redisTemplate.getConnectionFactory()); Long counter = entityIdCounter.incrementAndGet(); if ((null == counter || counter.longValue() == 1)) { if (StringUtils.isNotEmpty(dateFormat) && dateFormat.indexOf("yyMMdd") > -1) { entityIdCounter.expire(EXPIRE_TIME, TimeUnit.DAYS); } } return counter; }