本文章主要讲述如何使用Redis实现幂等、防抖、限流等功能。
幂等组件
import lombok.RequiredArgsConstructor; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import java.util.Objects; import java.util.concurrent.TimeUnit; /** * 消息队列幂等处理器 */ @Component @RequiredArgsConstructor public class MessageQueueIdempotentHandler { private final StringRedisTemplate stringRedisTemplate; private static final String IDEMPOTENT_KEY_PREFIX = "xxx:idempotent:"; /** * 判断当前消息是否消费过 * * @param messageId 消息唯一标识 * @return 消息是否消费过 */ public boolean isMessageBeingConsumed(String messageId) { String key = IDEMPOTENT_KEY_PREFIX + messageId; return Boolean.FALSE.equals(stringRedisTemplate.opsForValue().setIfAbsent(key, "0", 2, TimeUnit.MINUTES)); } /** * 判断消息消费流程是否执行完成 * * @param messageId 消息唯一标识 * @return 消息是否执行完成 */ public boolean isAccomplish(String messageId) { String key = IDEMPOTENT_KEY_PREFIX + messageId; return Objects.equals(stringRedisTemplate.opsForValue().get(key), "1"); } /** * 设置消息流程执行完成 * * @param messageId 消息唯一标识 */ public void setAccomplish(String messageId) { String key = IDEMPOTENT_KEY_PREFIX + messageId; stringRedisTemplate.opsForValue().set(key, "1", 2, TimeUnit.MINUTES); } /** * 如果消息处理遇到异常情况,删除幂等标识 * * @param messageId 消息唯一标识 */ public void delMessageProcessed(String messageId) { String key = IDEMPOTENT_KEY_PREFIX + messageId; stringRedisTemplate.delete(key); } }
@Component @RocketMQMessageListener(consumerGroup = "saaslink_consumer_group", topic = RedisKeyConstant.SHORT_LINK_STATS_STREAM_TOPIC_KEY) @Slf4j public class ShortLinkStatsSaveConsumer implements RocketMQListener<MessageExt> { @Override public void onMessage(MessageExt msgExt) { String msgId = msgExt.getMsgId(); // 使用redis实现幂等 if (messageQueueIdempotentHandler.isMessageBeingConsumed(msgId.toString())) { // 判断当前的这个消息流程是否执行完成 if (messageQueueIdempotentHandler.isAccomplish(msgId.toString())) { return; } throw new ServiceException("消息未完成流程,需要消息队列重试"); } try { byte[] msgExtBody = msgExt.getBody(); // 转为map Map<String, String> producerMap = JSON.parseObject(msgExtBody, Map.class); ShortLinkStatsRecordDTO statsRecord = JSON.parseObject(producerMap.get("statsRecord"), ShortLinkStatsRecordDTO.class); // 实际新增的逻辑 } catch (Throwable ex) { // 某某某情况宕机了 messageQueueIdempotentHandler.delMessageProcessed(msgId.toString()); log.error("记录短链接监控消费异常", ex); throw ex; } messageQueueIdempotentHandler.setAccomplish(msgId.toString()); } }
防抖组件
幂等注解,防止用户重复提交表单信息,主要是通过分布式锁实现。
import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * 幂等注解,防止用户重复提交表单信息 */ @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface NoDuplicateSubmit { /** * 触发幂等失败逻辑时,返回的错误提示信息 */ String message() default "您操作太快,请稍后再试"; }
import cn.hutool.crypto.digest.DigestUtil; import com.alibaba.fastjson2.JSON; import com.nageoffer.onecoupon.framework.exception.ClientException; import lombok.RequiredArgsConstructor; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.reflect.MethodSignature; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.springframework.web.context.request.RequestContextHolder; import org.springframework.web.context.request.ServletRequestAttributes; import java.lang.reflect.Method; /** * 防止用户重复提交表单信息切面控制器 */ @Aspect @RequiredArgsConstructor public final class NoDuplicateSubmitAspect { private final RedissonClient redissonClient; /** * 增强方法标记 {@link NoDuplicateSubmit} 注解逻辑 */ @Around("@annotation(com.nageoffer.onecoupon.framework.idempotent.NoDuplicateSubmit)") public Object noDuplicateSubmit(ProceedingJoinPoint joinPoint) throws Throwable { NoDuplicateSubmit noDuplicateSubmit = getNoDuplicateSubmitAnnotation(joinPoint); // 获取分布式锁标识 String lockKey = String.format("no-duplicate-submit:path:%s:currentUserId:%s:md5:%s", getServletPath(), getCurrentUserId(), calcArgsMD5(joinPoint)); RLock lock = redissonClient.getLock(lockKey); // 尝试获取锁,获取锁失败就意味着已经重复提交,直接抛出异常 if (!lock.tryLock()) { throw new ClientException(noDuplicateSubmit.message()); } Object result; try { // 执行标记了防重复提交注解的方法原逻辑 result = joinPoint.proceed(); } finally { lock.unlock(); } return result; } /** * @return 返回自定义防重复提交注解 */ public static NoDuplicateSubmit getNoDuplicateSubmitAnnotation(ProceedingJoinPoint joinPoint) throws NoSuchMethodException { MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature(); Method targetMethod = joinPoint.getTarget().getClass().getDeclaredMethod(methodSignature.getName(), methodSignature.getMethod().getParameterTypes()); return targetMethod.getAnnotation(NoDuplicateSubmit.class); } /** * @return 获取当前线程上下文 ServletPath */ private String getServletPath() { ServletRequestAttributes sra = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes(); return sra.getRequest().getServletPath(); } /** * @return 当前操作用户 ID */ private String getCurrentUserId() { // 从UserConText中获取 return "xxx"; } /** * @return joinPoint md5 */ private String calcArgsMD5(ProceedingJoinPoint joinPoint) { return DigestUtil.md5Hex(JSON.toJSONBytes(joinPoint.getArgs())); }
限流组件
Sentinel进行限流
/** * 初始化限流配置 */ @Component public class SentinelRuleConfig implements InitializingBean { @Override public void afterPropertiesSet() throws Exception { List<FlowRule> rules = new ArrayList<>(); FlowRule createOrderRule = new FlowRule(); createOrderRule.setResource("xxx"); createOrderRule.setGrade(RuleConstant.FLOW_GRADE_QPS); createOrderRule.setCount(1); rules.add(createOrderRule); FlowRuleManager.loadRules(rules); } }
/** * 自定义流控策略 */ public class CustomBlockHandler { public static Result<ShortLinkCreateRespDTO> createShortLinkBlockHandlerMethod(ShortLinkCreateReqDTO requestParam, BlockException exception) { return new Result<ShortLinkCreateRespDTO>().setCode("B100000").setMessage("当前访问网站人数过多,请稍后再试..."); } }
@PostMapping("/api/xxx/v1/create") @SentinelResource( value = "xxx", blockHandler = "createShortLinkBlockHandlerMethod", blockHandlerClass = CustomBlockHandler.class ) public Result<ShortLinkCreateRespDTO> create(@RequestBody CreateReqDTO requestParam) { return Results.success(Service.create(requestParam)); }
Redis限流组件
通过lua脚本,判断1s以内的并发请求数是否超过我们的预期,如果超过我们的预计就进行限制。
-- 设置用户访问频率限制的参数 local username = KEYS[1] local timeWindow = tonumber(ARGV[1]) -- 时间窗口,单位:秒 -- 构造 Redis 中存储用户访问次数的键名 local accessKey = "short-link:user-flow-risk-control:" .. username -- 原子递增访问次数,并获取递增后的值 local currentAccessCount = redis.call("INCR", accessKey) -- 设置键的过期时间 if currentAccessCount == 1 then redis.call("EXPIRE", accessKey, timeWindow) end -- 返回当前访问次数 return currentAccessCount
/** * 用户操作流量风控配置文件 */ @Data @Component @ConfigurationProperties(prefix = "xxx.flow-limit") public class UserFlowRiskControlConfiguration { /** * 是否开启用户流量风控验证 */ private Boolean enable; /** * 流量风控时间窗口,单位:秒 */ private String timeWindow; /** * 流量风控时间窗口内可访问次数 */ private Long maxAccessCount; }
xxx: group: max-num: 20 flow-limit: enable: true time-window: 1 max-access-count: 20
import com.alibaba.fastjson2.JSON; import com.cmk.saaslink.admin.config.common.UserFlowRiskControlConfiguration; import com.cmk.saaslink.common.convention.biz.user.UserContext; import com.cmk.saaslink.common.convention.exception.ClientException; import com.cmk.saaslink.common.convention.result.Results; import com.google.common.collect.Lists; import jakarta.servlet.*; import jakarta.servlet.http.HttpServletResponse; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.core.io.ClassPathResource; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.scripting.support.ResourceScriptSource; import java.io.IOException; import java.io.PrintWriter; import java.util.Optional; import static com.cmk.saaslink.common.convention.errorcode.BaseErrorCode.FLOW_LIMIT_ERROR; /** * 用户操作流量风控过滤器 */ @Slf4j @RequiredArgsConstructor public class UserFlowRiskControlFilter implements Filter { private final StringRedisTemplate stringRedisTemplate; private final UserFlowRiskControlConfiguration userFlowRiskControlConfiguration; private static final String USER_FLOW_RISK_CONTROL_LUA_SCRIPT_PATH = "lua/user_flow_risk_control.lua"; @SneakyThrows @Override public void doFilter(ServletRequest request, ServletResponse response, FilterChain filterChain) throws IOException, ServletException { DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(); redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource(USER_FLOW_RISK_CONTROL_LUA_SCRIPT_PATH))); redisScript.setResultType(Long.class); String username = Optional.ofNullable(UserContext.getUsername()).orElse("other"); Long result; try { result = stringRedisTemplate.execute(redisScript, Lists.newArrayList(username), userFlowRiskControlConfiguration.getTimeWindow()); } catch (Throwable ex) { log.error("执行用户请求流量限制LUA脚本出错", ex); returnJson((HttpServletResponse) response, JSON.toJSONString(Results.failure(new ClientException(FLOW_LIMIT_ERROR)))); return; } if (result == null || result > userFlowRiskControlConfiguration.getMaxAccessCount()) { returnJson((HttpServletResponse) response, JSON.toJSONString(Results.failure(new ClientException(FLOW_LIMIT_ERROR)))); return; } filterChain.doFilter(request, response); } private void returnJson(HttpServletResponse response, String json) throws Exception { response.setCharacterEncoding("UTF-8"); response.setContentType("text/html; charset=utf-8"); try (PrintWriter writer = response.getWriter()) { writer.print(json); } } }