java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java分布式锁

Java分布式锁几种常见的实现方式

作者:快乐吃手手  :  )

在java的分布式系统中,有时候会出现不同的服务操作同一个资源的情况,如交易系统和充值系统都要操作用户账户,分布式锁为解决分布式系统中多个应用同时访问同一个资源的问题,这篇文章主要介绍了Java分布式锁几种常见的实现方式,需要的朋友可以参考下

前言

分布式锁主要用于解决在分布式系统中多个节点对共享资源进行并发访问时可能出现的竞争问题。

在Java中实现分布式锁的方式主要有以下几种:

基于数据库的实现

(1)唯一约束

通过数据库表中设置唯一键约束来保证只有一个客户端可以获取到锁。通常会有一张专门的锁表,包含锁名称和锁的持有者信息等字段。

CREATE TABLE distributed_locks (
    lock_name VARCHAR(255) PRIMARY KEY,
    owner_info VARCHAR(255) NOT NULL
);

该代码示例展示了如何使用上述表来获取和释放锁,并执行相应的业务逻辑(例如更新库存),在请求A对业务进行操作的时候,假设请求B也进入到此方法,则会由于锁表的唯一索引lock_name而导致插入失败,导致其操作被拒绝,而主键则需要针对不同业务场景设置,不同业务场景不会触发锁机制。

    public void operateStock(Integer num) {
        String lockName = "product_stock_update_lock"; // 锁名称,如:业务编码
        String ownerId = "d2d00005sa5s512"; // 当前实例标识符,如:用户id

        // SQL == "INSERT INTO distributed_locks(lock_name, owner_info) VALUES (?, ?)"
        // 加锁插入成功返回true
        boolean gotLock = acquireLock(lockName, ownerId);
        if (gotLock) {
            try {
                // 执行业务逻辑,如更新指定物料库存
                //SQL == "UPDATE products SET stock = stock - #{num} WHERE product_id = '001' AND stock > 0";
                updateProductStock(num);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                // SQL == "DELETE FROM distributed_locks WHERE lock_name = ? AND owner_info = ?"
                releaseLock(lockName, ownerId);
            }
        } else {
            System.out.println("Failed to acquire lock.");
        }
    }

(2)行锁或表锁

在查询语句后面增加for update,数据库会在查询过程中给数据库表增加排他锁,当某条记录被加上排他锁之后,其他线程无法再在该行记录上增加排他锁,而事务提交后会自动释放锁。

    @Autowired
    private ProductMapper productMapper;

    @Transactional
    public void updateProductStock(String productId) {
        //查询并加锁   @Select("SELECT * FROM products WHERE product_id = #{productId} FOR UPDATE")
        Product product = productMapper.selectForUpdate(productId);
        if (product == null || product.getStock() <= 0) {
            System.out.println("库存不足或产品不存在");
            return;
        }
        
        // 更新库存  @Update("UPDATE products SET stock = stock - 1 WHERE product_id = #{productId}")
        int rowsAffected = productMapper.updateStock(productId);
        if (rowsAffected > 0) {
            System.out.println("库存更新成功");
        } else {
            System.out.println("未能成功更新库存");
        }
    }

(3)version乐观锁

乐观锁是一种处理并发控制的策略,它假设数据冲突的概率较低,因此不会在读取数据时加锁。相反,它会在更新数据时检查数据是否被其他事务修改过。这通常通过一个版本号(version)字段或时间戳来实现。

读取数据:当一个事务读取数据时,同时获取该记录的版本号或时间戳。

修改数据:当事务尝试更新数据时,它会使用版本号作为条件的一部分进行更新操作。

CREATE TABLE products (
    product_id VARCHAR(255) PRIMARY KEY,
    stock INT NOT NULL,
    version INT DEFAULT 0
);
    @Autowired
    private ProductMapper productMapper;

    @Transactional
    public void updateProductStock(String productId) {
        // 查询并加锁
        // @Select("SELECT product_id,stock,version FROM products WHERE product_id = #{productId} FOR UPDATE")
        Product product = productMapper.selectForUpdate(productId);
        if (product == null || product.getStock() <= 0) {
            System.out.println("库存不足或产品不存在");
            return;
        }

        // 尝试更新库存,并检查版本号
        // @Update("UPDATE products SET stock = stock - 1, version = version + 1 WHERE product_id = #{productId} AND version = #{version}")
        int rowsAffected = productMapper.updateStockWithVersion(productId, product.getVersion());
        if (rowsAffected > 0) {
            System.out.println("库存更新成功");
        } else {
            System.out.println("库存更新失败,可能已被其他事务更新");
            // 这里可以根据业务需求选择重试或者抛出异常等处理方式
        }
    }

基于Redis的实现

使用Redis实现分布式锁是一种高效且广泛采用的方法,特别适合于需要高吞吐量和低延迟的场景。Redis通过其原子操作命令提供了一种简单而有效的机制来实现分布式锁。

SET resource_name my_random_value NX PX 30000

NX 表示仅在键不存在时设置键。

PX 30000 设置键的过期时间为30秒,防止死锁(如果客户端崩溃或网络问题导致无法释放锁)。

RedisTemplate实现分布式锁

编写工具类

@Component
public class MyRedisLock {

    private final RedisTemplate<String, String> redisTemplate;

    @Autowired
    // 自Spring 4.3起,如果只有一个构造函数,可以省略@Autowired注解
    public MyRedisLock(RedisTemplate<String, String> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    // Lua 脚本用于释放锁
    private static final String UNLOCK_SCRIPT =
            "if redis.call('GET', KEYS[1]) == ARGV[1] then return redis.call('DEL', KEYS[1]) else return 0 end";

    // Lua 脚本用于续期锁
    private static final String RENEW_LOCK_SCRIPT =
            "if redis.call('GET', KEYS[1]) == ARGV[1] then return redis.call('PEXPIRE', KEYS[1], ARGV[2]) else return 0 end";

    // 锁的前缀,用于区分不同的锁
    private static final String LOCK_PREFIX = "lock:";

    // 续期锁的时间间隔(毫秒)
    private static final long RENEW_INTERVAL_MS = 2000;


    /**
     尝试获取锁
     @param lockKey   锁的键名
     @param expireMs  锁的过期时间(毫秒)
     @param operateId 锁的值
     @return   如果成功获取锁,返回 true;否则返回 false
     **/
    public boolean tryLock(String lockKey, long expireMs, String operateId) {
        Boolean result = redisTemplate.opsForValue().setIfAbsent(LOCK_PREFIX + lockKey, operateId, expireMs, TimeUnit.MILLISECONDS);     // setIfAbsent实现上锁
        return result != null && result;
    }

    /**
     尝试获取锁并自动续期
     @param lockKey   锁的键名
     @param expireMs  锁的过期时间(毫秒)
     @param operateId 锁的值
     @return   如果成功获取锁,返回锁的唯一标识符;否则返回 null
     **/
    public boolean tryLockWithRenewal(String lockKey, long expireMs, String operateId) {
        Boolean result = redisTemplate.opsForValue().setIfAbsent(LOCK_PREFIX + lockKey, operateId, expireMs, TimeUnit.MILLISECONDS);        // setIfAbsent实现上锁
        if (result != null && result) {
            // 启动续期线程
            startRenewalThread(lockKey, operateId, expireMs);
            return true;
        }
        return false;
    }

    /**
     释放锁
     @param lockKey   锁的键名
     @param operateId 锁的值(用于验证是否是持有锁的客户端)
     @return  如果成功释放锁,返回 true;否则返回 false
     **/
    public boolean unlock(String lockKey, String operateId) {
        DefaultRedisScript<Long> script = new DefaultRedisScript<>(UNLOCK_SCRIPT, Long.class);
        // 执行lua脚本,参数解释下:
        // 第一个参数script为lua脚本
        // 第二个参数为key的集合,会依次替换lua脚本中的KEYS[]数组的数据,默认1开始
        // 第三个参数为参数集合,会依次替换lua脚本中的ARGVS[]数组的数据,默认1开始
        Long result = redisTemplate.execute(script, Collections.singletonList(LOCK_PREFIX + lockKey), operateId);
        return result != null && result == 1L;
    }

    /**
     自动续期锁
     @param lockKey   锁的键名
     @param operateId 锁的值(用于验证是否是持有锁的客户端)
     @param expireMs  锁的过期时间(毫秒)
     @return  如果成功续期,返回 true;否则返回 false
     **/
    public boolean renewLock(String lockKey, String operateId, long expireMs) {
        DefaultRedisScript<Long> script = new DefaultRedisScript<>(RENEW_LOCK_SCRIPT, Long.class);
        Long result = redisTemplate.execute(script, Collections.singletonList(LOCK_PREFIX + lockKey), operateId, String.valueOf(expireMs));
        return result != null && result == 1L;
    }

    /**
     启动续期线程
     @param lockKey   锁的键名
     @param operateId 锁的值
     @param expireMs  锁的过期时间(毫秒)
     **/
    private void startRenewalThread(final String lockKey, final String operateId, final long expireMs) {
        Thread renewalThread = new Thread(() -> {
            try {
                while (true) {
                    // 每隔一段时间续期一次,需要确保间隔时间小于过期时间,过期或释放锁将无法续费
                    Thread.sleep(RENEW_INTERVAL_MS);
                    if (!renewLock(lockKey, operateId, expireMs)) {  // 续锁操作
                        // 如果续期失败,直接结束守护线程,停止锁续期行为。
                        // 这里说明下,删除锁和续锁都需要验证lockValue,这个上锁时通过uuid创建的,其他线程肯定获取的都不一致,这样确保续锁行为只能是自己的守护线程才可以操作;如果续锁失败了,则说明是主线程完成任务删除了key锁,所以这里守护线程也可以结束了
                        break;
                    }
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        renewalThread.setDaemon(true);  // 设置为守护线程
        renewalThread.start();
    }
}

代码示例:常规锁

    @Autowired
    private MyRedisLock redisLock;

    // 锁的默认过期时间(毫秒)
    private static final long DEFAULT_EXPIRE_TIME_MS = 5000;

    public void testLock() throws InterruptedException {
        String lockKey = "my_distributed_lock";
        //进程标识ID
        String operateId = java.util.UUID.randomUUID().toString();
        if (redisLock.tryLock(lockKey, DEFAULT_EXPIRE_TIME_MS, operateId)) {
            System.out.println(" 获取到锁,开始执行任务...");
            try {
                // 执行业务逻辑
                Thread.sleep(5000); // 模拟耗时操作
                System.out.println(Thread.currentThread().getName() + " 任务执行完成");
            } finally {
                if (redisLock.unlock(lockKey, operateId)) {
                    System.out.println(Thread.currentThread().getName() + " 成功释放锁");
                } else {
                    System.out.println(Thread.currentThread().getName() + " 释放锁失败,锁可能已被其他客户端删除");
                }
            }
        } else {
            System.out.println(Thread.currentThread().getName() + " 未能获取到锁");
        }
    }

代码示例:续费线程锁

    @Autowired
    private MyRedisLock redisLock;

    // 锁的默认过期时间(毫秒)
    private static final long DEFAULT_EXPIRE_TIME_MS = 5000;

    public void testLock() throws InterruptedException {
        String lockKey = "my_distributed_lock";
        //进程标识ID
        String operateId = java.util.UUID.randomUUID().toString();
        // 尝试获取锁并自动续期
        if ( redisLock.tryLockWithRenewal(lockKey, DEFAULT_EXPIRE_TIME_MS, operateId)) {
            try {
                // 执行业务逻辑
                Thread.sleep(8000); 
                System.out.println(Thread.currentThread().getName() + " 任务执行完成");
            } finally {
                // 释放锁
                boolean unlockSuccess = redisLock.unlock(lockKey, operateId);
                if (unlockSuccess) {
                    System.out.println(Thread.currentThread().getName() + " 成功释放锁");
                } else {
                    System.out.println(Thread.currentThread().getName() + " 释放锁失败,锁可能已被其他客户端删除");
                }
            }
        } else {
            System.out.println(Thread.currentThread().getName() + " 未能获取到锁");
        }
    }

基于Zookeeper的实现

基本原理

引入依赖(一个用于简化ZooKeeper操作的框架)

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>5.2.0</version>
</dependency>

代码示例

    private static final String ZK_ADDRESS = "localhost:2181";
    private static final String LOCK_PATH = "/distributed_lock_example";

    public void operate() throws Exception {
        // 创建CuratorFramework实例
        CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_ADDRESS, new ExponentialBackoffRetry(1000, 3));
        client.start();
        // 使用InterProcessMutex作为分布式锁
        InterProcessMutex lock = new InterProcessMutex(client, LOCK_PATH);
        try {
            // 获取锁
            lock.acquire(); 
            //业务操作代码
            this.performBusinessLogic();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (lock.isAcquiredInThisProcess()) {
                try {
                    // 释放锁
                    lock.release(); 
                    System.out.println("Lock released.");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            // 关闭客户端
            client.close(); 
        }
    }

    private static void performBusinessLogic() throws InterruptedException {
        // 模拟业务逻辑处理
        System.out.println("Performing some operations...");
        Thread.sleep(5000); // 暂停5秒模拟长时间操作
    }

基于etcd的实现(仅了解)

类似于Zookeeper,etcd也提供了类似的分布式协调服务,可以通过创建租约(lease)并附加到关键路径上来实现分布式锁。Etcd支持事务、watch机制等功能,使得它同样适用于构建分布式锁。

总结

到此这篇关于Java分布式锁几种常见的实现方式的文章就介绍到这了,更多相关Java分布式锁内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文