redis分布式锁以及自定义注解

  • Post author:
  • Post category:其他


1.同一个锁key,同一时间只能有一个客户端拿到锁,其他客户端会陷入无限的等待来尝试获取那个锁,只有获取到锁的客户端才能执行下面的业务逻辑。

2.引入依赖 aop redis

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

这里基于自定义注解引入spring的spel表达式 类似于

@Cacheable 缓存注解中参数

   <!--spel表达式-->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-expression</artifactId>
        </dependency>

3.具体锁业务方法

package org.xiaozhang.common.Lock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.RedisStringCommands;
import org.springframework.data.redis.connection.ReturnType;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.types.Expiration;
import org.springframework.stereotype.Component;

import java.nio.charset.Charset;
import java.util.concurrent.TimeUnit;

/**
 * @author Mr.Zhang
 * @version 1.0.0
 * @description:
 * @since 2023/3/27 17:30
 */
@Component
public class RedisLockService {
    private static Logger log = LoggerFactory.getLogger(RedisMutexAop.class);
    private static String UNLOCK_LUA;

    static {
        //如果存在删除返回true
        StringBuilder sb = new StringBuilder();
        sb.append("if redis.call(\"get\",KEYS[1]) == ARGV[1] ");
        sb.append("then ");
        sb.append("    return redis.call(\"del\",KEYS[1]) ");
        sb.append("else ");
        sb.append("    return 0 ");
        sb.append("end ");
        UNLOCK_LUA = sb.toString();
    }

    private RedisTemplate redisTemplate;

    private RedisLockService(RedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    /**
     * @param keyCode
     * @param keyValue
     * @param expire          key有效期
     * @param maxWaitLockTime 最大等待锁时间
     * @return
     */
    public boolean getLock(String keyCode, String keyValue, long expire, long maxWaitLockTime) {
        //最先开始时间
        long startTime = System.currentTimeMillis();
        return getLock(keyCode, keyValue, expire, startTime, maxWaitLockTime,0);
    }

    public boolean getLock(RedisMutexDo redisMutexDo) {
        //最先开始时间
        long startTime = System.currentTimeMillis();
        return getLock(redisMutexDo.getKeyCode(), redisMutexDo.getKeyValue(), Long.valueOf(redisMutexDo.getExprire()), startTime, Long.valueOf(redisMutexDo.getMaximumTime()),0);
    }

    /**
     * @param keyCode
     * @param keyValue
     * @param expire          key有效期
     * @param startTime       最先获取锁的开始时间戳(秒)
     * @param maxWaitLockTime 最大等待锁时间(秒)
     * @return
     */
    private boolean getLock(String keyCode, String keyValue, long expire, long startTime, long maxWaitLockTime,int executions) {
        log.info("当前线程:{}试图第{}次开始获取锁keyCode:{},keyValue:{}",Thread.currentThread().getName(),executions,keyCode,keyValue);
        //获取唯一互斥锁
        if (tryGetDistributedLock(keyCode, keyValue, expire)) {
            return true;
        } else {
            //等待时间超过n秒种直接返回
            long l = System.currentTimeMillis() - startTime;
            log.debug("{}等待时长为:{}",Thread.currentThread().getName(),l*0.001);
            if ((l)*0.001 > maxWaitLockTime) {
                System.out.println("当前资源拼缝,请重试");
                return false;
            }
            // 这里延迟2秒种在次获取
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                //异常都返回相同 资源异常
                return false;
            }
            //重新获取
            return getLock(keyCode, keyValue, expire, startTime,maxWaitLockTime,executions+1);
        }
    }

    /**
     * 获取keycode 当不存在时候返回true 并设置过期时间
     *
     * @param keyCode  key
     * @param keyValue 随便
     * @param expire   超时时间 seconds
     * @return
     */
    private boolean tryGetDistributedLock(String keyCode, String keyValue, long expire) {
        try {
            RedisCallback<Boolean> callback = (connection) -> {
                return connection.set(keyCode.getBytes(Charset.forName("UTF-8")), keyValue.getBytes(Charset.forName("UTF-8")), Expiration
                        .seconds(expire), RedisStringCommands.SetOption.SET_IF_ABSENT);
            };
            return (Boolean) this.redisTemplate.execute(callback);
        } catch (Exception var6) {
            System.out.println("set redis occured an exception" + var6);
            return false;
        }
    }

    /**
     * 释放锁
     */
    public boolean releaseDistributedLock(String keyCode, String keyValue) {
        try {
            RedisCallback<Boolean> callback = (redisConnection) -> {
                return (Boolean) redisConnection.eval(UNLOCK_LUA.getBytes(Charset.forName("UTF-8")), ReturnType.BOOLEAN, 1, new byte[][]{keyCode.getBytes(Charset.forName("UTF-8")), keyValue.getBytes(Charset.forName("UTF-8"))});
            };
            boolean var6 = (Boolean) this.redisTemplate.execute(callback);
            return var6;
        } catch (Exception var10) {
            return false;
        } finally {
            ;
        }
    }

    /**
     * 释放锁信息
     */
    public String getLockInfo(String keyCode) {
        try {
            RedisCallback<String> callback = (redisConnection) -> new String(redisConnection.get(keyCode.getBytes(Charset.forName("UTF-8"))));
            return (String) this.redisTemplate.execute(callback);
        } catch (Exception var10) {
            log.error("error get keyCode:{} Exception:{}", keyCode, var10);
            return "";
        }
    }
}

4.编写自定义注解

package org.xiaozhang.common.Lock;

import java.lang.annotation.*;

/**
 * @author Mr.Zhang
 * @version 1.0.0
 * @description: redis分布式互斥锁
 * @since 2023/3/27 16:12
 */
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RedisMutexLock {
    /**
     * key
     * @return
     */
    String keyCode() default "";

    /**
     * keyValue
     * @return
     */
    String keyValue() default "";
    /**
     * key过期时间
     * @return  默认3分钟
     */
    String exprire() default "18000";

    /**
     * 最大等待时间 默认2分钟
     * @return
     */
    String maximumTime() default "12000";

}

4.2 创建参数对象

package org.xiaozhang.common.Lock;

/**
 * @author Mr.Zhang
 * @version 1.0.0
 * @description:
 * @since 2023/3/27 16:26
 */
public class RedisMutexDo {
    private String keyCode;
    private String keyValue;
    /**
     * 最大等待时间 默认2分钟(秒)
     * @return
     */
    private String exprire;
    /**
     * 最大等待时间 默认3分钟(秒)
     * @return
     */
    private String maximumTime;
    public RedisMutexDo(){}
    public RedisMutexDo(String keyCode, String keyValue) {
        this.keyCode = keyCode;
        this.keyValue = keyValue;
        this.exprire = "120";
        this.maximumTime = "180";
    }
    public RedisMutexDo(String keyCode, String keyValue, String exprire, String maximumTime) {
        this.keyCode = keyCode;
        this.keyValue = keyValue;
        this.exprire = exprire;
        this.maximumTime = maximumTime;
    }

    public String getKeyCode() {
        return keyCode;
    }

    public void setKeyCode(String keyCode) {
        this.keyCode = keyCode;
    }

    public String getKeyValue() {
        return keyValue;
    }

    public void setKeyValue(String keyValue) {
        this.keyValue = keyValue;
    }

    public String getExprire() {
        return exprire;
    }

    public void setExprire(String exprire) {
        this.exprire = exprire;
    }

    public String getMaximumTime() {
        return maximumTime;
    }

    public void setMaximumTime(String maximumTime) {
        this.maximumTime = maximumTime;
    }

    @Override
    public String toString() {
        final StringBuffer sb = new StringBuffer("RedisMutexDo{");
        sb.append("keyCode='").append(keyCode).append('\'');
        sb.append(", keyValue='").append(keyValue).append('\'');
        sb.append(", exprire='").append(exprire).append('\'');
        sb.append(", maximumTime='").append(maximumTime).append('\'');
        sb.append('}');
        return sb.toString();
    }
}

4.3 创建aop切面类

package org.xiaozhang.common.Lock;

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.CodeSignature;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import org.xiaozhang.common.exception.OrderException;

import java.lang.reflect.Method;
import java.lang.reflect.Parameter;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

/**
 * @author Mr.Zhang
 * @version 1.0.0
 * @description:
 * @since 2023/3/27 16:13
 */
@Aspect
@Component
public class RedisMutexAop {
    private static Logger log = LoggerFactory.getLogger(RedisMutexAop.class);

    @Autowired
    private RedisLockService redisLockService;

    /***
     * 切入点
     */
    @Pointcut("@annotation(org.xiaozhang.common.Lock.RedisMutexLock)")
    public void pointCut() {
    }

    /*****
     * 环绕通知
     * @param point
     * @return
     */
    @Around("pointCut()")
    public Object aroundAdvice(ProceedingJoinPoint point) throws Throwable {
        Object obj = null;
        //获取方法参数
        Method month = getMonth(point);
        //获取map<参数名,参数值>
        Map<String, Object> param = getParam(point);
        //获取锁注解
        RedisMutexLock annotation = month.getAnnotation(RedisMutexLock.class);
        //锁的参数
        RedisMutexDo redisMutexDo = replaceRedisMutexLock(param, annotation);
        //获取锁
        if (redisLockService.getLock(redisMutexDo)) {
            log.info("当前线程{}获取锁,开始执行业务方法",Thread.currentThread().getName());
            //执行目标方法
            obj = point.proceed();
        } else {
            //不执行返回并抛出异常
            log.error("获取分布式锁失败");
            throw new OrderException("获取分布式锁失败");
        }
        if (redisLockService.releaseDistributedLock(redisMutexDo.getKeyCode(), redisMutexDo.getKeyValue())) {
            //释放锁
            log.info("当前线程{}成功释放锁",Thread.currentThread().getName());
        } else {
            //释放失败
            log.error("释放失败{}",redisLockService.getLockInfo(redisMutexDo.getKeyCode()));
        }
        return obj;
    }

    /**
     * 获取参数名和参数值
     * @param proceedingJoinPoint
     * @return
     */
    private Map<String, Object> getParam(ProceedingJoinPoint proceedingJoinPoint) {
        Map<String, Object> map = new HashMap<String, Object>();
        Object[] values = proceedingJoinPoint.getArgs();
        String[] names = ((CodeSignature) proceedingJoinPoint.getSignature()).getParameterNames();
        for (int i = 0; i < names.length; i++) {
            map.put(names[i], values[i]);
        }
        return map;
    }

    /**
     * 获取ProceedingJoinPoint 代理的方法对象
     *
     * @param point
     * @return
     */
    private Method getMonth(ProceedingJoinPoint point) {
        Class<?> targetCls = point.getTarget().getClass();
        //获取方法签名(通过此签名获取目标方法信息)
        MethodSignature ms = (MethodSignature) point.getSignature();
        //获取目标方法上的注解指定的操作名称
        Method targetMethod = null;
        try {
            targetMethod = targetCls.getDeclaredMethod(
                    ms.getName(),
                    ms.getParameterTypes());
        } catch (NoSuchMethodException e) {
            log.error("未找到方法异常{}", e);
        }
        return targetMethod;
    }

    /**
     * 获取当前执行方法中参数中包含#字段值或者RedisMutexDo对象中参数
     *
     * @param args
     * @param annotation
     * @return 返回锁的参数
     */
    private RedisMutexDo replaceRedisMutexLock(Map<String, Object> param, RedisMutexLock annotation) {
        RedisMutexDo redisMutexDo = new RedisMutexDo();
        // 创建ExpressionParser解析表达式
        ExpressionParser parser = new SpelExpressionParser();
        // 创建一个虚拟的容器EvaluationContext
        StandardEvaluationContext ctx = new StandardEvaluationContext();
        //遍历注解中参数属性
        SpelExpression keyCode= null;
        if (checkSignStart(annotation.keyCode(),"#")){
            keyCode = Optional.ofNullable(annotation.keyCode())
                    .map(k -> (SpelExpression) parser.parseExpression(annotation.keyCode())).orElse(null);
        }
        SpelExpression keyValue= null;
        if (checkSignStart(annotation.keyValue(),"#")){
            keyValue =  Optional.ofNullable(annotation.keyValue())
                    .map(k->(SpelExpression)parser.parseExpression(annotation.keyValue())).orElse(null);
        }
        SpelExpression exprire=null;
        if (checkSignStart(annotation.exprire(),"#")){
            exprire =   Optional.ofNullable(annotation.exprire()).map(k->k.contains("#"))
                    .map(k->(SpelExpression)parser.parseExpression(annotation.exprire())).orElse(null);
        }
        SpelExpression maximumTime= null;
        if (checkSignStart(annotation.maximumTime(),"#")){
            maximumTime= Optional.ofNullable(annotation.maximumTime()).map(k->k.contains("#"))
                    .map(k->(SpelExpression)parser.parseExpression(annotation.maximumTime())).orElse(null);
        }
        //遍历参数和对应值
        param.forEach((k,v)->{
            ctx.setVariable(k, v);
        });
        //赋值
        redisMutexDo.setKeyCode(keyCode!=null?(String) keyCode.getValue(ctx):annotation.keyCode());
        redisMutexDo.setKeyValue(keyValue!=null?(String) keyValue.getValue(ctx):annotation.keyValue());
        redisMutexDo.setExprire(exprire!=null?(String) exprire.getValue(ctx):annotation.exprire());
        redisMutexDo.setMaximumTime(maximumTime!=null?(String) maximumTime.getValue(ctx):annotation.maximumTime());
        return redisMutexDo;
    }

 

    /**
     * 检查字符串是否以符号开头
     * @param str 字符串
     * @param sign 符号
     * @return
     */
    private boolean checkSignStart(String str,String sign){
        if (str == null || "".equals(str)){
            return false;
        }
        //去除前后字符串
        if (str.trim().lastIndexOf(sign)==0){
            return true;
        }
        return false;
    }
}

5.开始业务测试 创建个业务方法并重写两种方式

  //车数
    public static int cs=100;

    @RedisMutexLock(keyCode = "#keyCode",keyValue = "#keyValue",exprire = "#exprire",maximumTime = "#maximumTime")
    public JsonResponse getLock(String keyCode,String keyValue,String exprire,String maximumTime){
        RedisMutexDo redisMutexDo = new RedisMutexDo(keyCode,keyValue,exprire,maximumTime);
        return this.getLock(redisMutexDo);
    }
    @RedisMutexLock(keyCode = "#redisMutexDo.keyCode",keyValue = "#redisMutexDo.keyValue",exprire = "#redisMutexDo.exprire",maximumTime = "#redisMutexDo.maximumTime")
    public JsonResponse getLock(RedisMutexDo redisMutexDo) {
        try {
            //睡个20秒
            TimeUnit.SECONDS.sleep(20);
        } catch (InterruptedException e) {
            log.error("中断异常:{}",e);
        }
        log.info("获得了锁进入业务方法");
        cs=cs-1;
        return JsonResponse.ok(redisMutexDo).setMessage("抢到");
    }

5.2 开始用测试类去测试

这里一开始没有启动redis服务,现在处于连接失败状态,不过资源还在一直尝试抢锁,

2秒抢一次锁,最大等待时间/2=抢锁次数这里做了打印

180/2=大约重试锁60次,就会结束抛出异常不再执行业务方法。



版权声明:本文为qq_31259663原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。