java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > java三级缓存

java如何实现高并发场景下三级缓存的数据一致性

作者:码出极致

这篇文章主要为大家详细介绍了java如何实现高并发场景下三级缓存的数据一致性,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下

下面代码是一个使用Java和Redisson实现的三级缓存服务,主要功能包括:

1.缓存结构

2.数据读取流程

3.数据更新流程

4.并发控制

5.集群同步

该实现综合运用了延迟双删、发布订阅、锁机制和TTL等多种策略,保障了高并发场景下三级缓存的数据一致性,尤其适合分布式微服务架构。

实战代码:

import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.redisson.api.*;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.CacheBuilder;

@Service
public class CacheService {
    // 本地一级缓存(Caffeine)
    private final Cache<String, Object> localCache;
    // Redisson客户端,用于分布式操作
    private final RedissonClient redissonClient;
    // 锁缓存,用于控制并发
    private final ConcurrentHashMap<String, ReentrantLock> lockMap = new ConcurrentHashMap<>();
    // 延迟任务执行器
    private final ScheduledExecutorService scheduledExecutorService;
    // 主题订阅,用于接收集群消息
    private final RTopic cacheClearTopic;

    @Autowired
    public CacheService(RedissonClient redissonClient) {
        this.redissonClient = redissonClient;
        this.localCache = CacheBuilder.newBuilder()
                .maximumSize(10_000)
                .expireAfterWrite(10, TimeUnit.MINUTES)
                .build();
        this.scheduledExecutorService = Executors.newScheduledThreadPool(5);
        this.cacheClearTopic = redissonClient.getTopic("cache:clear");
        
        // 注册消息监听器
        cacheClearTopic.addListener(String.class, (channel, key) -> {
            localCache.invalidate(key);
        });
    }

    // 读取缓存
    public Object get(String key) {
        // 1. 先查本地缓存
        Object value = localCache.getIfPresent(key);
        if (value != null) {
            return value;
        }

        // 2. 本地缓存未命中,查Redis
        RMap<String, Object> redisMap = redissonClient.getMap("cache");
        value = redisMap.get(key);
        if (value != null) {
            localCache.put(key, value);
            return value;
        }

        // 3. Redis未命中,查数据库
        ReentrantLock lock = lockMap.computeIfAbsent(key, k -> new ReentrantLock());
        lock.lock();
        try {
            // 双重检查
            value = localCache.getIfPresent(key);
            if (value != null) {
                return value;
            }
            
            value = redisMap.get(key);
            if (value != null) {
                localCache.put(key, value);
                return value;
            }

            // 从数据库读取
            value = readFromDatabase(key);
            if (value != null) {
                // 放入Redis并设置TTL
                redisMap.put(key, value, 300, TimeUnit.SECONDS);
                // 放入本地缓存
                localCache.put(key, value);
            }
            return value;
        } finally {
            lock.unlock();
            lockMap.remove(key);
        }
    }

    // 更新数据
    public void update(String key, Object value) {
        // 使用分布式锁保证写操作的原子性
        RLock lock = redissonClient.getLock("writeLock:" + key);
        lock.lock();
        try {
            // 1. 更新数据库
            boolean success = updateDatabase(key, value);
            if (success) {
                // 2. 先删除本地缓存
                localCache.invalidate(key);
                // 3. 删除Redis缓存
                RMap<String, Object> redisMap = redissonClient.getMap("cache");
                redisMap.remove(key);
                // 4. 发布清除缓存的消息到集群
                cacheClearTopic.publish(key);
                // 5. 延迟双删
                scheduledExecutorService.schedule(() -> {
                    redisMap.remove(key);
                }, 100, TimeUnit.MILLISECONDS);
            }
        } finally {
            lock.unlock();
        }
    }

    // 从数据库读取数据(示例方法)
    private Object readFromDatabase(String key) {
        // 实际实现中会查询数据库
        return "data_from_db_" + key;
    }

    // 更新数据库(示例方法)
    private boolean updateDatabase(String key, Object value) {
        // 实际实现中会更新数据库
        return true;
    }
}    

redisson配置

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RedissonConfig {

    @Bean
    public RedissonClient redissonClient() {
        Config config = new Config();
        // 单机模式配置
        config.useSingleServer()
              .setAddress("redis://localhost:6379")
              .setConnectionMinimumIdleSize(5)
              .setConnectionPoolSize(50);
        
        // 集群模式配置示例
        /*
        config.useClusterServers()
              .addNodeAddress("redis://node1:6379", "redis://node2:6379")
              .setScanInterval(2000)
              .setMasterConnectionMinimumIdleSize(10)
              .setMasterConnectionPoolSize(64)
              .setSlaveConnectionMinimumIdleSize(10)
              .setSlaveConnectionPoolSize(64);
        */
        
        return Redisson.create(config);
    }
}    

到此这篇关于java如何实现高并发场景下三级缓存的数据一致性的文章就介绍到这了,更多相关java三级缓存内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文