Redis实现幂等、防抖、限流等功能

本文章主要讲述如何使用Redis实现幂等、防抖、限流等功能。

幂等组件

import lombok.RequiredArgsConstructor; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component;  import java.util.Objects; import java.util.concurrent.TimeUnit;  /**  * 消息队列幂等处理器  */ @Component @RequiredArgsConstructor public class MessageQueueIdempotentHandler {      private final StringRedisTemplate stringRedisTemplate;      private static final String IDEMPOTENT_KEY_PREFIX = "xxx:idempotent:";      /**      * 判断当前消息是否消费过      *      * @param messageId 消息唯一标识      * @return 消息是否消费过      */     public boolean isMessageBeingConsumed(String messageId) {         String key = IDEMPOTENT_KEY_PREFIX + messageId;         return Boolean.FALSE.equals(stringRedisTemplate.opsForValue().setIfAbsent(key, "0", 2, TimeUnit.MINUTES));     }      /**      * 判断消息消费流程是否执行完成      *      * @param messageId 消息唯一标识      * @return 消息是否执行完成      */     public boolean isAccomplish(String messageId) {         String key = IDEMPOTENT_KEY_PREFIX + messageId;         return Objects.equals(stringRedisTemplate.opsForValue().get(key), "1");     }      /**      * 设置消息流程执行完成      *      * @param messageId 消息唯一标识      */     public void setAccomplish(String messageId) {         String key = IDEMPOTENT_KEY_PREFIX + messageId;         stringRedisTemplate.opsForValue().set(key, "1", 2, TimeUnit.MINUTES);     }      /**      * 如果消息处理遇到异常情况,删除幂等标识      *      * @param messageId 消息唯一标识      */     public void delMessageProcessed(String messageId) {         String key = IDEMPOTENT_KEY_PREFIX + messageId;         stringRedisTemplate.delete(key);     } } 
@Component @RocketMQMessageListener(consumerGroup = "saaslink_consumer_group", topic = RedisKeyConstant.SHORT_LINK_STATS_STREAM_TOPIC_KEY) @Slf4j public class ShortLinkStatsSaveConsumer implements RocketMQListener<MessageExt> {     @Override     public void onMessage(MessageExt msgExt) {         String msgId = msgExt.getMsgId();         // 使用redis实现幂等         if (messageQueueIdempotentHandler.isMessageBeingConsumed(msgId.toString())) {             // 判断当前的这个消息流程是否执行完成             if (messageQueueIdempotentHandler.isAccomplish(msgId.toString())) {                 return;             }             throw new ServiceException("消息未完成流程,需要消息队列重试");         }         try {             byte[] msgExtBody = msgExt.getBody();             // 转为map             Map<String, String> producerMap = JSON.parseObject(msgExtBody, Map.class);             ShortLinkStatsRecordDTO statsRecord = JSON.parseObject(producerMap.get("statsRecord"), ShortLinkStatsRecordDTO.class);             // 实际新增的逻辑             } catch (Throwable ex) {             // 某某某情况宕机了             messageQueueIdempotentHandler.delMessageProcessed(msgId.toString());             log.error("记录短链接监控消费异常", ex);             throw ex;         }         messageQueueIdempotentHandler.setAccomplish(msgId.toString());     } }  

防抖组件

幂等注解,防止用户重复提交表单信息,主要是通过分布式锁实现。

import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target;  /**  * 幂等注解,防止用户重复提交表单信息  */ @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface NoDuplicateSubmit {      /**      * 触发幂等失败逻辑时,返回的错误提示信息      */     String message() default "您操作太快,请稍后再试"; } 
import cn.hutool.crypto.digest.DigestUtil; import com.alibaba.fastjson2.JSON; import com.nageoffer.onecoupon.framework.exception.ClientException; import lombok.RequiredArgsConstructor; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.reflect.MethodSignature; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.springframework.web.context.request.RequestContextHolder; import org.springframework.web.context.request.ServletRequestAttributes;  import java.lang.reflect.Method;  /**  * 防止用户重复提交表单信息切面控制器  */ @Aspect @RequiredArgsConstructor public final class NoDuplicateSubmitAspect {      private final RedissonClient redissonClient;      /**      * 增强方法标记 {@link NoDuplicateSubmit} 注解逻辑      */     @Around("@annotation(com.nageoffer.onecoupon.framework.idempotent.NoDuplicateSubmit)")     public Object noDuplicateSubmit(ProceedingJoinPoint joinPoint) throws Throwable {         NoDuplicateSubmit noDuplicateSubmit = getNoDuplicateSubmitAnnotation(joinPoint);         // 获取分布式锁标识         String lockKey = String.format("no-duplicate-submit:path:%s:currentUserId:%s:md5:%s", getServletPath(), getCurrentUserId(), calcArgsMD5(joinPoint));         RLock lock = redissonClient.getLock(lockKey);         // 尝试获取锁,获取锁失败就意味着已经重复提交,直接抛出异常         if (!lock.tryLock()) {             throw new ClientException(noDuplicateSubmit.message());         }         Object result;         try {             // 执行标记了防重复提交注解的方法原逻辑             result = joinPoint.proceed();         } finally {             lock.unlock();         }         return result;     }      /**      * @return 返回自定义防重复提交注解      */     public static NoDuplicateSubmit getNoDuplicateSubmitAnnotation(ProceedingJoinPoint joinPoint) throws NoSuchMethodException {         MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();         Method targetMethod = joinPoint.getTarget().getClass().getDeclaredMethod(methodSignature.getName(), methodSignature.getMethod().getParameterTypes());         return targetMethod.getAnnotation(NoDuplicateSubmit.class);     }      /**      * @return 获取当前线程上下文 ServletPath      */     private String getServletPath() {         ServletRequestAttributes sra = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();         return sra.getRequest().getServletPath();     }      /**      * @return 当前操作用户 ID      */     private String getCurrentUserId() {         // 从UserConText中获取         return "xxx";     }      /**      * @return joinPoint md5      */     private String calcArgsMD5(ProceedingJoinPoint joinPoint) {         return DigestUtil.md5Hex(JSON.toJSONBytes(joinPoint.getArgs()));     } 

限流组件

Sentinel进行限流

/**  * 初始化限流配置  */ @Component public class SentinelRuleConfig implements InitializingBean {      @Override     public void afterPropertiesSet() throws Exception {         List<FlowRule> rules = new ArrayList<>();         FlowRule createOrderRule = new FlowRule();         createOrderRule.setResource("xxx");         createOrderRule.setGrade(RuleConstant.FLOW_GRADE_QPS);         createOrderRule.setCount(1);         rules.add(createOrderRule);         FlowRuleManager.loadRules(rules);     } } 
/**  * 自定义流控策略  */ public class CustomBlockHandler {      public static Result<ShortLinkCreateRespDTO> createShortLinkBlockHandlerMethod(ShortLinkCreateReqDTO requestParam, BlockException exception) {         return new Result<ShortLinkCreateRespDTO>().setCode("B100000").setMessage("当前访问网站人数过多,请稍后再试...");     } } 
    @PostMapping("/api/xxx/v1/create")     @SentinelResource(             value = "xxx",             blockHandler = "createShortLinkBlockHandlerMethod",             blockHandlerClass = CustomBlockHandler.class     )     public Result<ShortLinkCreateRespDTO> create(@RequestBody CreateReqDTO requestParam) {         return Results.success(Service.create(requestParam));     } 

Redis限流组件

通过lua脚本,判断1s以内的并发请求数是否超过我们的预期,如果超过我们的预计就进行限制。

-- 设置用户访问频率限制的参数 local username = KEYS[1] local timeWindow = tonumber(ARGV[1]) -- 时间窗口,单位:秒  -- 构造 Redis 中存储用户访问次数的键名 local accessKey = "short-link:user-flow-risk-control:" .. username  -- 原子递增访问次数,并获取递增后的值 local currentAccessCount = redis.call("INCR", accessKey)  -- 设置键的过期时间 if currentAccessCount == 1 then     redis.call("EXPIRE", accessKey, timeWindow) end  -- 返回当前访问次数 return currentAccessCount 
/**  * 用户操作流量风控配置文件  */ @Data @Component @ConfigurationProperties(prefix = "xxx.flow-limit") public class UserFlowRiskControlConfiguration {      /**      * 是否开启用户流量风控验证      */     private Boolean enable;      /**      * 流量风控时间窗口,单位:秒      */     private String timeWindow;      /**      * 流量风控时间窗口内可访问次数      */     private Long maxAccessCount; } 
xxx:   group:     max-num: 20   flow-limit:     enable: true     time-window: 1     max-access-count: 20 
import com.alibaba.fastjson2.JSON; import com.cmk.saaslink.admin.config.common.UserFlowRiskControlConfiguration; import com.cmk.saaslink.common.convention.biz.user.UserContext; import com.cmk.saaslink.common.convention.exception.ClientException; import com.cmk.saaslink.common.convention.result.Results; import com.google.common.collect.Lists; import jakarta.servlet.*; import jakarta.servlet.http.HttpServletResponse; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.core.io.ClassPathResource; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.scripting.support.ResourceScriptSource;  import java.io.IOException; import java.io.PrintWriter; import java.util.Optional;  import static com.cmk.saaslink.common.convention.errorcode.BaseErrorCode.FLOW_LIMIT_ERROR;   /**  * 用户操作流量风控过滤器  */ @Slf4j @RequiredArgsConstructor public class UserFlowRiskControlFilter implements Filter {      private final StringRedisTemplate stringRedisTemplate;     private final UserFlowRiskControlConfiguration userFlowRiskControlConfiguration;      private static final String USER_FLOW_RISK_CONTROL_LUA_SCRIPT_PATH = "lua/user_flow_risk_control.lua";      @SneakyThrows     @Override     public void doFilter(ServletRequest request, ServletResponse response, FilterChain filterChain) throws IOException, ServletException {         DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();         redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource(USER_FLOW_RISK_CONTROL_LUA_SCRIPT_PATH)));         redisScript.setResultType(Long.class);         String username = Optional.ofNullable(UserContext.getUsername()).orElse("other");         Long result;         try {             result = stringRedisTemplate.execute(redisScript, Lists.newArrayList(username), userFlowRiskControlConfiguration.getTimeWindow());         } catch (Throwable ex) {             log.error("执行用户请求流量限制LUA脚本出错", ex);             returnJson((HttpServletResponse) response, JSON.toJSONString(Results.failure(new ClientException(FLOW_LIMIT_ERROR))));             return;         }         if (result == null || result > userFlowRiskControlConfiguration.getMaxAccessCount()) {             returnJson((HttpServletResponse) response, JSON.toJSONString(Results.failure(new ClientException(FLOW_LIMIT_ERROR))));             return;         }         filterChain.doFilter(request, response);     }      private void returnJson(HttpServletResponse response, String json) throws Exception {         response.setCharacterEncoding("UTF-8");         response.setContentType("text/html; charset=utf-8");         try (PrintWriter writer = response.getWriter()) {             writer.print(json);         }     } } 

发表评论

相关文章