自定义注解+Spel实现分布式锁方式
作者:皮皮熙のFans
自定义注解+Spel实现分布式锁
依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.4.4</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>demo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>demo</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <version>2.3.7.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-redis</artifactId> <version>5.4.5</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
RedisLockRegistryConfig
package com.example.demo.config; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.integration.redis.util.RedisLockRegistry; @Configuration public class RedisLockRegistryConfig { /** * 默认过期时间300s */ @Value("${distribute.lock.expireTime:300}") private long expireTime; @Value("${spring.application.name:'distributeLock'}") private String registryKey; @Bean public RedisLockRegistry redisLockRegistry(RedisConnectionFactory factory){ return new RedisLockRegistry(factory, registryKey, expireTime * 1000); } }
自定义注解
package com.example.demo.aop.annotation; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface DistributeLock { String name() default ""; }
自定义切面
package com.example.demo.aop; import com.example.demo.aop.annotation.DistributeLock; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.reflect.MethodSignature; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.DefaultParameterNameDiscoverer; import org.springframework.core.annotation.AnnotationUtils; import org.springframework.core.annotation.Order; import org.springframework.expression.EvaluationContext; import org.springframework.expression.Expression; import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.expression.spel.support.StandardEvaluationContext; import org.springframework.integration.redis.util.RedisLockRegistry; import org.springframework.stereotype.Component; import java.lang.reflect.Method; import java.util.Objects; import java.util.concurrent.locks.Lock; @Aspect @Order @Component public class DistributeLockAop { private static final Logger LOGGER = LoggerFactory.getLogger(DistributeLockAop.class); private static SpelExpressionParser parser = new SpelExpressionParser(); private static DefaultParameterNameDiscoverer discoverer = new DefaultParameterNameDiscoverer(); private RedisLockRegistry redisLockRegistry; public DistributeLockAop(RedisLockRegistry redisLockRegistry) { this.redisLockRegistry = redisLockRegistry; } @Around("@annotation(com.example.demo.aop.annotation.DistributeLock)") public Object around(ProceedingJoinPoint joinPoint) throws Throwable { Class<?> clazz = joinPoint.getTarget().getClass(); MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature(); Method method = clazz.getDeclaredMethod(methodSignature.getName(), methodSignature.getParameterTypes()); DistributeLock distributeLock = AnnotationUtils.findAnnotation(method, DistributeLock.class); assert distributeLock != null; String spel = distributeLock.name(); String lockName = generateKeyBySpEL(spel, method, joinPoint.getArgs()); Lock lock = redisLockRegistry.obtain(lockName); if (lock.tryLock()) { LOGGER.info("DistributeLock locked Success. key:{}", lockName); return joinPoint.proceed(); } else { LOGGER.error("DistributeLock locked Failure. key:{}", lockName); throw new Exception("Lock failure"); } } public static String generateKeyBySpEL(String spELString, Method method, Object[] args) { String[] paramNames = discoverer.getParameterNames(method); Expression expression = parser.parseExpression(spELString); EvaluationContext context = new StandardEvaluationContext(); for (int i = 0; i < args.length; i++) { assert paramNames != null; context.setVariable(paramNames[i], args[i]); } return Objects.requireNonNull(expression.getValue(context)).toString(); } }
测试类
package com.example.demo.base; import com.example.demo.aop.annotation.DistributeLock; import org.springframework.stereotype.Service; @Service public class SomeService { @DistributeLock(name = "'lock:' + #something.name") public void doSomething(Something something) { } }
package com.example.demo.base; public class Something { private String name; public String getName() { return name; } public void setName(String name) { this.name = name; } }
package com.example.demo; import com.example.demo.base.Something; import com.example.demo.base.SomeService; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ConfigurableApplicationContext; @SpringBootApplication public class DemoApplication { public static void main(String[] args) { ConfigurableApplicationContext context = SpringApplication.run(DemoApplication.class, args); SomeService someService = context.getBean("someService", SomeService.class); Something something = new Something(); something.setName("gogogo"); someService.doSomething(something); } }
执行结果
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.4.4)2021-03-25 16:27:49.638 INFO 10492 --- [ main] com.example.demo.DemoApplication : Starting DemoApplication using Java 1.8.0_141 on P80320948 with PID 10492 (D:\workspace\demo\target\classes started by 80320948 in D:\workspace\demo)
2021-03-25 16:27:49.641 INFO 10492 --- [ main] com.example.demo.DemoApplication : No active profile set, falling back to default profiles: default
2021-03-25 16:27:50.006 INFO 10492 --- [ main] .s.d.r.c.RepositoryConfigurationDelegate : Multiple Spring Data modules found, entering strict repository configuration mode!
2021-03-25 16:27:50.008 INFO 10492 --- [ main] .s.d.r.c.RepositoryConfigurationDelegate : Bootstrapping Spring Data Redis repositories in DEFAULT mode.
2021-03-25 16:27:50.028 INFO 10492 --- [ main] .s.d.r.c.RepositoryConfigurationDelegate : Finished Spring Data repository scanning in 6 ms. Found 0 Redis repository interfaces.
2021-03-25 16:27:50.144 INFO 10492 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2021-03-25 16:27:50.153 INFO 10492 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2021-03-25 16:27:50.156 INFO 10492 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2021-03-25 16:27:50.253 INFO 10492 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.integration.config.IntegrationManagementConfiguration' of type [org.springframework.integration.config.IntegrationManagementConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-03-25 16:27:50.329 INFO 10492 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-03-25 16:27:50.330 INFO 10492 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-03-25 16:27:50.832 INFO 10492 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService 'taskScheduler'
2021-03-25 16:27:50.872 INFO 10492 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-03-25 16:27:50.872 INFO 10492 --- [ main] o.s.i.channel.PublishSubscribeChannel : Channel 'demo.errorChannel' has 1 subscriber(s).
2021-03-25 16:27:50.872 INFO 10492 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started bean '_org.springframework.integration.errorLogger'
2021-03-25 16:27:50.878 INFO 10492 --- [ main] com.example.demo.DemoApplication : Started DemoApplication in 1.756 seconds (JVM running for 2.569)
2021-03-25 16:27:51.545 INFO 10492 --- [ main] com.example.demo.aop.DistributeLockAop : DistributeLock locked Success. key:lock:gogogo
基于注解的方式实现分布式锁
分布式锁的实现有两种方法
- 基于redis
- 基于zookeeper
为了方便分布式锁的使用, 基于注解的方式抽取成公用组件
DisLock注解
/** * 分布式锁的注解, 通过指定key作为分布式锁的key * * @author wang.js on 2019/1/29. * @version 1.0 */ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) public @interface DisLock { /** * 分布式锁的key * * @return */ String key(); /** * 分布式锁用的业务场景id * * @return */ String biz(); /** * 过期时间, 默认是5秒 * 单位是秒 * * @return */ int expireTime() default 5; }
处理DisLock的切面
/** * 处理@DisLock注解的切面 * * @author wang.js on 2019/1/29. * @version 1.0 */ @Aspect @Order(value = 1) @Component public class DisLockAspect { @Resource private DisLockUtil disLockUtil; private static final int MIN_EXPIRE_TIME = 3; @Around(value = "@annotation(disLock)") public Object execute(ProceedingJoinPoint proceedingJoinPoint, DisLock disLock) throws Throwable { int expireTIme = disLock.expireTime() < MIN_EXPIRE_TIME ? MIN_EXPIRE_TIME : disLock.expireTime(); String disKey = CacheKeyParser.parse(proceedingJoinPoint, disLock.key(), disLock.biz()); boolean lock = disLockUtil.lock(disKey, expireTIme); int count = 1; while (!lock && count < MIN_EXPIRE_TIME) { lock = disLockUtil.lock(disKey, expireTIme); count++; TimeUnit.SECONDS.sleep(1); } Object proceed; if (lock) { // 允许查询 try { proceed = proceedingJoinPoint.proceed(); } finally { // 删除分布式锁 disLockUtil.unlock(disKey, false); } } else { throw new CustomException(ErrorCodeEnum.DUPLICATE_REQUEST.getMessage()); } return proceed; } }
redis的配置
/** * @author wang.js * @date 2018/12/17 * @copyright yougou.com */ @Configuration public class RedisConfig { @Value("${spring.redis.host}") private String host; @Value("${spring.redis.port:6379}") private Integer port; @Bean public JedisPool jedisPool() { //1.设置连接池的配置对象 JedisPoolConfig config = new JedisPoolConfig(); //设置池中最大连接数 config.setMaxTotal(50); //设置空闲时池中保有的最大连接数 config.setMaxIdle(10); config.setMaxWaitMillis(3000L); config.setTestOnBorrow(true); //2.设置连接池对象 return new JedisPool(config,host,port); } }
redis分布式锁的实现
/** * redis分布式锁的实现 * * @author wang.js * @date 2018/12/18 * @copyright yougou.com */ @Component public class DisLockUtil { @Resource private JedisPool jedisPool; private static final int DEFAULT_EXPIRE_TIME = 5; private static final Long RELEASE_SUCCESS = 1L; private static final String LOCK_SUCCESS = "OK"; private static final String SET_IF_NOT_EXIST = "NX"; private static final String SET_WITH_EXPIRE_TIME = "PX"; /** * 尝试获取分布式锁 * * @param jedis Redis客户端 * @param lockKey 锁 * @param requestId 请求标识 * @param expireTime 超期时间 * @return 是否获取成功 */ public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) { String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); if (LOCK_SUCCESS.equals(result)) { return true; } return false; } /** * 释放分布式锁 * * @param jedis Redis客户端 * @param lockKey 锁 * @param requestId 请求标识 * @return 是否释放成功 */ public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) { String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId)); if (RELEASE_SUCCESS.equals(result)) { return true; } return false; } /** * 释放锁 * * @param key * @return */ public final boolean unlock(String key, boolean needCheck) { boolean result = false; Jedis jedis = jedisPool.getResource(); try { if (needCheck) { String expireTimeCache = jedis.get(key); // 判断锁是否过期了 if (StringUtils.isBlank(expireTimeCache)) { result = true; } if (System.currentTimeMillis() - Long.parseLong(expireTimeCache) > 0) { // 直接删除 jedis.del(key); result = true; } } else { jedis.del(key); } } finally { jedis.close(); } return result; } /** * 获取分布式锁 * * @param key * @param expireSecond * @return */ public final boolean lock(String key, int expireSecond) { if (StringUtils.isBlank(key)) { throw new RuntimeException("传入的key为空"); } expireSecond = expireSecond == 0 ? DEFAULT_EXPIRE_TIME : expireSecond; // 过期的时候的时间戳 long expireTime = System.currentTimeMillis() + expireSecond * 1000 + 1; boolean setResult = false; Jedis jedis = jedisPool.getResource(); try { if (jedis.setnx(key, String.valueOf(expireTime)) == 1) { // 说明加锁成功 setResult = true; } if (jedis.ttl(key) < 0) { jedis.expire(key, expireSecond); } if (setResult) { return true; } String expireTimeCache = jedis.get(key); System.out.println(expireTimeCache + "====" + jedis.ttl(key) + ", now:" + System.currentTimeMillis()); // 判断锁是否过期了 if (StringUtils.isNotBlank(expireTimeCache) && System.currentTimeMillis() - Long.parseLong(expireTimeCache) > 0) { String oldExpireTime = jedis.getSet(key, String.valueOf(expireTime)); if (StringUtils.isNotBlank(oldExpireTime) && oldExpireTime.equals(String.valueOf(expireTime))) { jedis.expire(key, expireSecond); setResult = true; } } } finally { jedis.close(); } return setResult; } }
实现分布式锁的关键是对key的设置, 需要获取实际的参数来设置分布式锁, 这里自定义了解析器
/** * cache key 的解析器 * * @author wang.js on 2019/2/27. * @version 1.0 */ public class CacheKeyParser { /** * 解析缓存的key * * @param proceedingJoinPoint 切面 * @param cacheKey 缓存的key * @param biz 业务 * @return String * @throws IllegalAccessException 异常 */ public static String parse(ProceedingJoinPoint proceedingJoinPoint, String cacheKey, String biz) throws IllegalAccessException { // 解析实际参数的key String key = cacheKey.replace("#", ""); StringTokenizer stringTokenizer = new StringTokenizer(key, "."); Map<String, Object> nameAndValue = getNameAndValue(proceedingJoinPoint); Object actualKey = null; while (stringTokenizer.hasMoreTokens()) { if (actualKey == null) { actualKey = nameAndValue.get(stringTokenizer.nextToken()); } else { actualKey = getPropValue(actualKey, stringTokenizer.nextToken()); } } return biz + actualKey; } /** * 获取参数Map集合 * * @param joinPoint 切面 * @return Map<String, Object> */ private static Map<String, Object> getNameAndValue(ProceedingJoinPoint joinPoint) { Object[] paramValues = joinPoint.getArgs(); String[] paramNames = ((CodeSignature) joinPoint.getSignature()).getParameterNames(); Map<String, Object> param = new HashMap<>(paramNames.length); for (int i = 0; i < paramNames.length; i++) { param.put(paramNames[i], paramValues[i]); } return param; } /** * 获取指定参数名的参数值 * * @param obj * @param propName * @return * @throws IllegalAccessException */ public static Object getPropValue(Object obj, String propName) throws IllegalAccessException { Field[] fields = obj.getClass().getDeclaredFields(); for (Field f : fields) { if (f.getName().equals(propName)) { //在反射时能访问私有变量 f.setAccessible(true); return f.get(obj); } } return null; } }
ErrorCodeEnum
public enum ErrorCodeEnum { SUCCESS("查询成功", "200"), SERVER_ERROR("服务器异常", "500"), SECKILL_END("秒杀活动已结束", "250"), GOODS_KILLED("秒杀成功", "502"), ERROR_SIGN("签名不合法", "260"), UPDATE_SUCCESS("更新成功", "0"), SAVE_SUCCESS("保存成功", "0"), UPDATE_FAIL("更新失败", "256"), EMPTY_PARAM("参数为空", "257"), SAVE_ERROR("保存失败", "262"), SERVER_TIMEOUT("调用超时", "501"), USER_NOT_FOUND("找不到用户", "502"), COUPON_NOT_FOUND("找不到优惠券", "503"), DUPLICATE("出现重复", "504"), USER_STATUS_ABNORMAL("用户状态异常", "505"), NO_TOKEN("无token,请重新登录", "506"), ERROR_TOKEN("token不合法", "507"), EMPTY_RESULT("暂无数据", "508"), DUPLICATE_REQUEST("重复请求", "509"), ; /** * 定义的message */ private String message; /** * 定义的错误码 */ private String errCode; ErrorCodeEnum(String message, String errCode) { this.message = message; this.errCode = errCode; } public String getMessage() { return message; } protected void setMessage(String message) { this.message = message; } public String getErrCode() { return errCode; } protected void setErrCode(String errCode) { this.errCode = errCode; } }
自定义异常CustomException
/** * @author Eric on 2018/12/24. * @version 1.0 */ @Data @NoArgsConstructor @AllArgsConstructor @Accessors(chain = true) @EqualsAndHashCode(callSuper = true) public class CustomException extends RuntimeException { private String message; }
配置文件
spring: redis: host: mini7 port: 6379
测试
定义一个方法, 加上@RedisCache注解, cacheKey的值必须是#实际参数名.属性名的格式, 如果想要成其他的格式可以修改CacheKeyParser中的parse方法
@DisLock(key = "#id", biz = CommonBizConstant.SECOND_KILL) @Override public String testRedisCache(String id) { LOGGER.info("调用方法获取值"); return "大傻逼"; }
在springboot启动类上加上@ComponentScan({“com.eric”})
/** * @author Eric on 2019/1/26. * @version 1.0 */ @SpringBootApplication @MapperScan("com.eric.base.data.dao") @ComponentScan({"com.eric"}) @EnableFeignClients @EnableDiscoveryClient public class BaseDataApplication { public static void main(String[] args) { SpringApplication.run(BaseDataApplication.class, args); } }
写个测试类调用上面的方法
/** * 基础数据 * * @author wang.js on 2019/2/27. * @version 1.0 */ @SpringBootTest @RunWith(SpringRunner.class) public class BaseDataTest { @Resource private SysDictService sysDictService; @Test public void t1() { for (int i = 0; i < 100; i++) { sysDictService.testRedisCache("1"); } } }
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。