MySql实现分布式锁的示例代码
作者:码农小伙
本篇我们使用mysql实现一个分布式锁。
环境:mysql8,navicat,maven,springboot2.3.11,mybatis-plus
分布式锁的功能
1,分布式锁使用者位于不同的机器中,锁获取成功之后,才可以对共享资源进行操作
2,锁具有重入的功能:即一个使用者可以多次获取某个锁
3,获取锁有超时的功能:即在指定的时间内去尝试获取锁,超过了超时时间,如果还未获取成功,则返回获取失败
4,能够自动容错,比如:A机器获取锁lock1之后,在释放锁lock1之前,A机器挂了,导致锁lock1未释放,结果会lock1一直被A机器占有着,遇到这种情况时,分布式锁要能够自动解决,可以这么做:持有锁的时候可以加个持有超时时间,超过了这个时间还未释放的,其他机器将有机会获取锁
预备技能:乐观锁
通常我们修改表中一条数据过程如下:
t1:select获取记录R1 t2:对R1进行编辑 t3:update R1
我们来看一下上面的过程存在的问题:
如果A、B两个线程同时执行到t1,他们俩看到的R1的数据一样,然后都对R1进行编辑,然后去执行t3,最终2个线程都会更新成功,后面一个线程会把前面一个线程update的结果给覆盖掉,这就是并发修改数据存在的问题。
我们可以在表中新增一个版本号,每次更新数据时候将版本号作为条件,并且每次更新时候版本号+1,过程优化一下,如下:
t1:打开事务start transaction t2:select获取记录R1,声明变量v=R1.version t3:对R1进行编辑 t4:执行更新操作 update R1 set version = version + 1 where user_id=#user_id# and version = #v#; t5:t4中的update会返回影响的行数,我们将其记录在count中,然后根据count来判断提交还是回滚 if(count==1){ //提交事务 commit; }else{ //回滚事务 rollback; }
上面重点在于步骤t4,当多个线程同时执行到t1,他们看到的R1是一样的,但是当他们执行到t4的时候,数据库会对update的这行记录加锁,确保并发情况下排队执行,所以只有第一个的update会返回1,其他的update结果会返回0,然后后面会判断count是否为1,进而对事务进行提交或者回滚。可以通过count的值知道修改数据是否成功了。
上面这种方式就乐观锁。我们可以通过乐观锁的方式确保数据并发修改过程中的正确性。
使用mysql实现分布式锁
我们创建一个分布式锁表,如下
DROP TABLE IF EXISTS t_lock; create table t_lock( lock_key varchar(32) PRIMARY KEY NOT NULL COMMENT '锁唯一标志', request_id varchar(64) NOT NULL DEFAULT '' COMMENT '用来标识请求对象的', lock_count INT NOT NULL DEFAULT 0 COMMENT '当前上锁次数', timeout BIGINT NOT NULL DEFAULT 0 COMMENT '锁超时时间', version INT NOT NULL DEFAULT 0 COMMENT '版本号,每次更新+1' )COMMENT '锁信息表';
java代码如下
mapper接口
package com.shiguiwu.springmybatis.mapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.shiguiwu.springmybatis.lock.model.LockModel; import org.springframework.stereotype.Repository; /** * @description: 锁mapper * @author: stone * @date: Created by 2021/5/30 11:12 * @version: 1.0.0 * @pakeage: com.shiguiwu.springmybatis.mapper */ @Repository public interface LockMapper extends BaseMapper<LockModel> { }
锁对象model
package com.shiguiwu.springmybatis.lock.model; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import com.baomidou.mybatisplus.annotation.Version; import lombok.Data; /** * @description: 锁模型 * @author: stone * @date: Created by 2021/9/10 11:13 * @version: 1.0.0 * @pakeage: com.shiguiwu.springmybatis.lock.model */ @Data @TableName("t_lock") public class LockModel { /** * 锁的唯一值 */ @TableId private String lockKey; /** * 请求id,同一个线程里请求id一样 */ private String requestId; //锁次数 private Integer lockCount; //锁超时 private Long timeout; //乐观锁版本 @Version private Integer version; }
锁接口
package com.shiguiwu.springmybatis.lock; /** * @description: 锁接口 * @author: stone * @date: Created by 2021/9/10 11:40 * @version: 1.0.0 * @pakeage: com.shiguiwu.springmybatis.lock */ public interface ILock<T> { /** * 获取分布式锁,支持重入 * @param lockKey 锁可以 * @param lockTimeout 持有锁的有效时间,防止死锁 * @param getTimeout 获取锁超时时间, * @return 是否锁成功 */ public boolean lock(String lockKey, long lockTimeout, int getTimeout) throws Exception; /** * 解锁 * @param lockKey 锁key * */ public void unlock(String lockKey); /** * 重置锁对象 * @param t 锁对象 * @return 返回锁记录 */ public int restLock(T t); }
锁的实现代码如下
package com.shiguiwu.springmybatis.lock; import cn.hutool.core.util.StrUtil; import com.shiguiwu.springmybatis.lock.model.LockModel; import com.shiguiwu.springmybatis.mapper.LockMapper; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Objects; import java.util.UUID; import java.util.concurrent.TimeUnit; /** * @description: mysql实现分布式锁 * @author: stone * @date: Created by 2021/9/10 11:09 * @version: 1.0.0 * @pakeage: com.shiguiwu.springmybatis.lock */ @Component @Slf4j public class MysqlLock implements ILock<LockModel>{ static ThreadLocal<String> requestIds = new ThreadLocal<>(); @Autowired private LockMapper lockMapper; public String getRequestId() { String requestId = requestIds.get(); if (StrUtil.isBlank(requestId)) { requestId = UUID.randomUUID().toString(); requestIds.set(requestId); } log.info("获取到的requestId===> {}", requestId); return requestId; } /** * 获取锁 * @param lockKey 锁可以 * @param lockTimeout 持有锁的有效时间,防止死锁 * @param getTimeout 获取锁超时时间, * @return */ @Override public boolean lock(String lockKey, long lockTimeout, int getTimeout) throws Exception { log.info(" lock start =======================> {}",lockKey); //从local中获取 请求id String requestId = this.getRequestId(); //获取锁的结果 boolean lockResult = false; //开始时间 long startTime = System.currentTimeMillis(); while (true) { LockModel lockModel = lockMapper.selectById(lockKey); if (Objects.nonNull(lockModel)) { //获取锁对象的请求id String reqId = lockModel.getRequestId(); //如果是空,表示改锁未被占有 if (StrUtil.isBlank(reqId)) { //马上占有它 //设置请求id lockModel.setRequestId(requestId); //设置锁次数 lockModel.setLockCount(1); //设置超时时间,防止死锁 lockModel.setTimeout(System.currentTimeMillis() + lockTimeout); if (lockMapper.updateById(lockModel) == 1) { lockResult = true; break; } } //如果request_id和表中request_id一样表示锁被当前线程持有者,此时需要加重入锁 else if (requestId.equals(reqId)) { //可重入锁 lockModel.setTimeout(System.currentTimeMillis() + lockTimeout); //设置获取初次 lockModel.setLockCount(lockModel.getLockCount() + 1); if (lockMapper.updateById(lockModel) == 1) { lockResult = true; break; } } //不为空,也不相等,说明是其他线程占有 else { //锁不是自己的,并且已经超时了,则重置锁,继续重试 if (lockModel.getTimeout() < System.currentTimeMillis()) { //未超时,继续重试 this.restLock(lockModel); } //如果未超时,休眠100毫秒,继续重试 else { if (startTime + getTimeout > System.currentTimeMillis()) { TimeUnit.MILLISECONDS.sleep(100); } else { //防止长时间阻塞 break; } } } } //如果是空,就插入一个锁,重新尝试获取锁 else { lockModel = new LockModel(); //设置锁key lockModel.setLockKey(lockKey); lockMapper.insert(lockModel); } } log.info(" lock end =======================> {}",lockKey); return lockResult; } /** * 释放锁 * @param lockKey 锁key */ @Override public void unlock(String lockKey) { LockModel lockModel = lockMapper.selectById(lockKey); //获取当前线程的请求id String reqId = this.getRequestId(); //获取锁次数 int count = 0; //当前线程requestId和库中request_id一致 && lock_count>0,表示可以释放锁 if (Objects.nonNull(lockModel) && reqId.equals(lockModel.getRequestId()) && (count = lockModel.getLockCount()) > 0) { if (count == 1) { //重置锁 this.restLock(lockModel); } //重入锁的问题,锁的次数减一 else { lockModel.setLockCount(lockModel.getLockCount() - 1); //更新次数 lockMapper.updateById(lockModel); } } } /** * 重置锁 * @param lockModel 锁对象 * @return 更新条数 */ @Override public int restLock(LockModel lockModel) { lockModel.setLockCount(0); lockModel.setRequestId(""); lockModel.setTimeout(0L); return lockMapper.updateById(lockModel); } }
上面代码中实现了文章开头列的分布式锁的所有功能,大家可以认真研究下获取锁的方法:lock,释放锁的方法:unlock。
测试用例
package com.shiguiwu.springmybatis; import com.shiguiwu.springmybatis.lock.ILock; import com.shiguiwu.springmybatis.lock.model.LockModel; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; /** * @description: 锁测试 * @author: stone * @date: Created by 2021/9/10 15:32 * @version: 1.0.0 * @pakeage: com.shiguiwu.springmybatis */ @SpringBootTest @Slf4j public class LockApplicationTests { @Autowired private ILock<LockModel> mysqlLock; 测试重复获取和重复释放 @Test public void testRepeat() throws Exception { for (int i = 0; i < 10; i++) { mysqlLock.lock("key1", 10000L, 1000); } for (int i = 0; i < 10; i++) { mysqlLock.unlock("key1"); } } // //获取之后不释放,超时之后被thread1获取 @Test public void testTimeout() throws Exception { String lockKey = "key2"; mysqlLock.lock(lockKey, 5000L, 1000); Thread thread1 = new Thread(() -> { try { mysqlLock.lock(lockKey, 5000L, 7000); } catch (Exception e) { e.printStackTrace(); } finally { mysqlLock.unlock(lockKey); } }, "thread1"); thread1.start(); thread1.join(); } }
test1方法测试了重入锁的效果。
test2测试了主线程获取锁之后一直未释放,持有锁超时之后被thread1获取到了
留给大家一个问题
上面分布式锁还需要考虑一个问题:比如A机会获取了key1的锁,并设置持有锁的超时时间为10秒,但是获取锁之后,执行了一段业务操作,业务操作耗时超过10秒了,此时机器B去获取锁时可以获取成功的,此时会导致A、B两个机器都获取锁成功了,都在执行业务操作,这种情况应该怎么处理?大家可以思考一下然后留言,我们一起讨论一下。
到此这篇关于MySql实现分布式锁的示例代码的文章就介绍到这了,更多相关MySql 分布式锁内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!