SpringBoot+Redis实现分布式缓存的方法步骤
作者:忧伤夏天的风
背景
缓存(Cache):指将程序或系统中常用的数据对象存储在像内存这样特定的介质中,以避免在每次程序调用时,重新创建或组织数据所带来的性能损耗,从而提高了系统的整体运行速度。
以目前的系统架构来说,用户的请求一般会先经过缓存系统,如果缓存中没有相关的数据,就会在其他系统中查询到相应的数据并保存在缓存中,最后返回给调用方。
本地缓存:指程序级别的缓存组件,它的特点是本地缓存和应用程序会运行在同一个进程中,所以本地缓存的操作会非常快,因为在同一个进程内也意味着不会有网络上的延迟和开销。
本地缓存适用于单节点非集群的应用场景,它的优点是快,缺点是多程序无法共享缓存,比如分布式用户 Session 会话信息保存,由于每次用户访问的服务器可能是不同的,如果不能共享缓存,那么就意味着每次的请求操作都有可能被系统阻止,因为会话信息只保存在某一个服务器上,当请求没有被转发到这台存储了用户信息的服务器时,就会被认为是非登录的违规操作。
除此之外,无法共享缓存可能会造成系统资源的浪费,这是因为每个系统都单独维护了一份属于自己的缓存,而同一份缓存有可能被多个系统单独进行存储,从而浪费了系统资源。
分布式缓存:指将应用系统和缓存组件进行分离的缓存机制,这样多个应用系统就可以共享一套缓存数据了,它的特点是共享缓存服务和可集群部署,为缓存系统提供了高可用的运行环境,以及缓存共享的程序运行机制。
本地缓存可以使用EhCache 和 Google 的 Guava来实现,而分布式缓存可以使用 Redis 或 Memcached 来实现。
由于 Redis 本身就是独立的缓存系统,因此可以作为第三方来提供共享的数据缓存,而 Redis 的分布式支持主从、哨兵和集群的模式,所以它就可以支持分布式的缓存,而 Memcached 的情况也是类似的。
分布式缓存常见文件及解决方案
分布式缓存设计的核心问题是以哪种方式进行缓存预热和缓存更新, 以及如何优雅解决缓存雪崩、缓存穿透、缓存降级等问题。这些问题在不 同的应用场景下有不同的解决方案。
缓存预热: 缓存预热指在用户请求数据前先将数据加载到缓存系统中,用户查询 事先被预热的缓存数据,以提高系统查询效率。缓存预热一般有系统启动 加载、定时加载等方式。
缓存更新: 缓存更新指在数据发生变化后及时将变化后的数据更新到缓存中。常 见的缓存更新策略有以下4种。
- 定时更新:定时将底层数据库内的数据更新到缓存中,该方法比较 简单,适合需要缓存的数据量不是很大的应用场景。
- 过期更新:定时将缓存中过期的数据更新为最新数据并更新缓存的 过期时间。
- 写请求更新:在用户有写请求时先写数据库同时更新缓存,这适用 于用户对缓存数据和数据库的数据有实时强一致性要求的情况。
- 读请求更新:在用户有读请求时,先判断该请求数据的缓存是否存 在或过期,如果不存在或已过期,则进行底层数据库查询并将查询结果更 新到缓存中,同时将查询结果返回给用户。
缓存淘汰策略 在缓存数据过多时需要使用某种淘汰算法决定淘汰哪些数据。常用的 淘汰算法有以下几种。
- FIFO(First In First Out,先进先出):判断被存储的时间,离 目前最远的数据优先被淘汰。
- LRU(Least Recently Used,最近最少使用):判断缓存最近被使 用的时间,距离当前时间最远的数据优先被淘汰。
- LFU(Least Frequently Used,最不经常使用):在一段时间内, 被使用次数最少的缓存优先被淘汰。
缓存雪崩
缓存雪崩指在同一时刻由于大量缓存失效,导致大量原本应该访问缓 存的请求都去查询数据库,而对数据库的CPU和内存造成巨大压力,严重的 话会导致数据库宕机,从而形成一系列连锁反应,使整个系统崩溃。
解决方案:
- 请求加锁:对于并发量不是很多的应用,使用请求加锁排队的方案 防止过多请求数据库。
- 失效更新:为每一个缓存数据都增加过期标记来记录缓存数据是否 失效,如果缓存标记失效,则更新数据缓存。
- 缓存数据的过期时间设置随机:为不同的数据设置不同的缓存失效时间,防止在同一时刻有大量的数据失效。
- 如果缓存数据库是分布式部署,将热点数据均匀分布在不同的缓存数据库中。
- 设置热点数据永远不过期。
缓存穿透
缓存穿透指由于缓存系统故障或者用户频繁查询系统中不存在(在系 统中不存在,在自然数据库和缓存中都不存在)的数据,而这时请求穿过 缓存不断被发送到数据库,导致数据库过载,进而引发一连串并发问题。 比如用户发起一个userName为zhangsan的请求,而在系统中并没有名 为zhangsan的用户,这样就导致每次查询时在缓存中都找不到该数据,然 后去数据库中再查询一遍。由于zhangsan用户本身在系统中不存在,自然 返回空,导致请求穿过缓存频繁查询数据库,在用户频繁发送该请求时将 导致数据库系统负载增大,从而可能引发其他问题。常用的解决缓存穿透 问题的方法有布隆过滤器和cache null策略。
解决方案:
- 接口层增加校验,如用户鉴权校验,id做基础校验,id<=0的直接拦截,一定不存在的不去查询数据库。
- 布隆过滤器:指将所有可能存在的数据都映射到一个足够大的 Bitmap中,在用户发起请求时首先经过布隆过滤器的拦截,一个一定不存 在的数据会被这个布隆过滤器拦截,从而避免对底层存储系统带来查询上 的压力。
- cache null策略:指如果一个查询返回的结果为null(可能是数据 不存在,也可能是系统故障),我们仍然缓存这个null结果,但它的过期 时间会很短,通常不超过 5 分钟;在用户再次请求该数据时直接返回 null,而不会继续访问数据库,从而有效保障数据库的安全。其实cache null策略的核心原理是:在缓存中记录一个短暂的(数据过期时间内)数 据在系统中是否存在的状态,如果不存在,则直接返回null,不再查询数 据库,从而避免缓存穿透到数据库上。
缓存击穿
缓存击穿是指缓存中没有但数据库中有的数据(一般是缓存时间到期),这时由于并发用户特别多,同时读缓存没读到数据,又同时去数据库去取数据,引起数据库压力瞬间增大,造成过大压力
解决方案:
- 设置热点数据永远不过期。
- 加互斥锁。
缓存降级
缓存降级指由于访问量剧增导致服务出现问题(如响应时间慢或不响 应)时,优先保障核心业务的运行,减少或关闭非核心业务对资源的使 用。
服务降级策略:
- 写降级:在写请求增大时,可以只进行Cache的更新,然后将数据 异步更新到数据库中,保证最终一致性即可,即将写请求从数据库降级为 Cache。
- 读降级:在数据库服务负载过高或数据库系统故障时,可以只对 Cache进行读取并将结果返回给用户,在数据库服务正常后再去查询数据 库,即将读请求从数据库降级为Cache。这种方式适用于对数据实时性要求 不高的场景,保障了在系统发生故障的情况下用户依然能够访问到数据, 只是访问到的数据相对有延迟。
一、在pom中添加依赖
<!--springboot redis依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
二、在配置文件中配置Redis连接
# Redis数据库索引(默认为0) spring.redis.database=0 # Redis服务器地址 spring.redis.host=127.0.0.1 # Redis服务器连接端口 spring.redis.port=6379 # Redis服务器连接密码(默认为空) spring.redis.password=123456 # 连接池最大连接数(使用负值表示没有限制) spring.redis.jedis.pool.max-idle=10 # 连接池最大阻塞等待时间(使用负值表示没有限制) spring.redis.jedis.pool.max-wait=3000 # 连接池中的最小空闲连接 spring.redis.jedis.pool.min-idle=0 # 连接超时时间(毫秒) spring.redis.timeout=3000
三、编写Redis配置文件
@Configuration @EnableCaching public class RedisConfig extends CachingConfigurerSupport { /** * RedisTemplate相关配置 * 使redis支持插入对象 * * @param factory * @return 方法缓存 Methods the cache */ @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) { RedisTemplate<String, Object> template = new RedisTemplate<>(); // 配置连接工厂 template.setConnectionFactory(factory); // 序列化和反序列化redis的value值(默认使用JDK的序列化方式) Jackson2JsonRedisSerializer jacksonSerializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper om = new ObjectMapper(); // 指定要序列化的域,field,get和set,以及修饰符范围,ANY是都有包括private和public om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); // 指定序列化输入的类型,类必须是非final修饰的,final修饰的类,比如String,Integer等会跑出异常 om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jacksonSerializer.setObjectMapper(om); // 值采用json序列化 template.setValueSerializer(jacksonSerializer); // 使用StringRedisSerializer来序列化和反序列化redis的key值 template.setKeySerializer(new StringRedisSerializer()); // 设置hash key 和value序列化模式 template.setHashKeySerializer(new StringRedisSerializer()); template.setHashValueSerializer(jacksonSerializer); template.afterPropertiesSet(); return template; } }
四、测试Redis缓存
/** * @Author: oyc * @Description: redis 测试控制类 * @Since 2020年5月12日 23:35:05 */ @RestController @RequestMapping("/redis") public class OyRedisController { /** * 依赖注入,注入redisTemplate */ @Autowired private RedisTemplate redisTemplate; /** * 测试redis string add */ @GetMapping("/string/add") public String addStringKeyValue(@RequestParam(value = "key", defaultValue = "key1") String key, @RequestParam(value = "value", defaultValue = "redis value") String value) { redisTemplate.opsForValue().set(key, value); return (String) redisTemplate.opsForValue().get(key); } /** * 测试redis string add */ @GetMapping("/object/add") public Object addObjectKeyValue(@RequestParam(value = "key", defaultValue = "key1") String key) { OyUser user = new OyUser(1, "宋江", "18", "male"); redisTemplate.opsForValue().set(key, user); return redisTemplate.opsForValue().get(key); } /** * 测试redis string get */ @GetMapping("/string/get") public Object getStringByKey(@RequestParam(value = "key", defaultValue = "key1") String key) { return redisTemplate.opsForValue().get(key); } }
4.1 测试key、value 为string
4.1.1 使用接口往redis添加记录:
4.1.2 使用IDEA的redis客户端插件查看记录:
4.1.3 使用接口获取缓存数据:
4.2 测试key为string,value 为对象
五、工具类
上面案例都是直接用RedisTemplate操作Redis,操作比较复杂,借鉴网友写了一个RedisUtils工具类,RedisUtils交给Spring容器实例化,使用时直接注解注入,使用方便简单,减少使用难度。
/** * @Description: redisTemplate封装 * Redis支持五种数据类型:string(字符串),hash(哈希),list(列表),set(集合)及zset(sorted set:有序集合)。 * @Author: oyc * @Since 2020年5月12日 23:41:08 */ @Component public class RedisUtil { @Autowired private RedisTemplate<String, Object> redisTemplate; public RedisUtil(RedisTemplate<String, Object> redisTemplate) { this.redisTemplate = redisTemplate; } //==========================基本操作=============================== /** * 指定缓存失效时间 * * @param key 键 * @param time 时间(秒) * @return */ public boolean expire(String key, long time) { try { if (time > 0) { redisTemplate.expire(key, time, TimeUnit.SECONDS); } return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 根据key 获取过期时间 * * @param key 键 不能为null * @return 时间(秒) 返回0代表为永久有效 */ public long getExpire(String key) { return redisTemplate.getExpire(key, TimeUnit.SECONDS); } /** * 判断key是否存在 * * @param key 键 * @return true 存在 false不存在 */ public boolean hasKey(String key) { try { return redisTemplate.hasKey(key); } catch (Exception e) { e.printStackTrace(); return false; } } /** * 删除缓存 * * @param key 可以传一个值 或多个 */ public void del(String... key) { if (key != null && key.length > 0) { if (key.length == 1) { redisTemplate.delete(key[0]); } else { redisTemplate.delete(CollectionUtils.arrayToList(key)); } } } /** * 模糊查询获取key值 * * @param pattern * @return */ public Set keys(String pattern) { return redisTemplate.keys(pattern); } /** * 使用Redis的消息队列 * * @param channel * @param message 消息内容 */ public void convertAndSend(String channel, Object message) { redisTemplate.convertAndSend(channel, message); } //============================String============================= /** * 普通缓存获取 * * @param key 键 * @return 值 */ public Object get(String key) { return key == null ? null : redisTemplate.opsForValue().get(key); } /** * 普通缓存放入 * * @param key 键 * @param value 值 * @return true成功 false失败 */ public boolean set(String key, Object value) { try { redisTemplate.opsForValue().set(key, value); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 普通缓存放入并设置时间 * * @param key 键 * @param value 值 * @param time 时间(秒) time要大于0 如果time小于等于0 将设置无限期 * @return true成功 false 失败 */ public boolean set(String key, Object value, long time) { try { if (time > 0) { redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS); } else { set(key, value); } return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 递增 * * @param key 键 * @param delta 要增加几(大于0) * @return */ public long incr(String key, long delta) { if (delta < 0) { throw new RuntimeException("递增因子必须大于0"); } return redisTemplate.opsForValue().increment(key, delta); } /** * 递减 * * @param key 键 * @param delta 要减少几(小于0) * @return */ public long decr(String key, long delta) { if (delta < 0) { throw new RuntimeException("递减因子必须大于0"); } return redisTemplate.opsForValue().increment(key, -delta); } //================================hash================================= /** * 向一张hash表中放入数据,如果不存在将创建 * * @param key 键 * @param item 项 * @param value 值 * @return true 成功 false失败 */ public boolean hset(String key, String item, Object value) { try { redisTemplate.opsForHash().put(key, item, value); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 向一张hash表中放入数据,如果不存在将创建 * * @param key 键 * @param item 项 * @param value 值 * @param time 时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间 * @return true 成功 false失败 */ public boolean hset(String key, String item, Object value, long time) { try { redisTemplate.opsForHash().put(key, item, value); if (time > 0) { expire(key, time); } return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * HashGet * * @param key 键 不能为null * @param item 项 不能为null * @return 值 */ public Object hget(String key, String item) { return redisTemplate.opsForHash().get(key, item); } /** * 删除hash表中的值 * * @param key 键 不能为null * @param item 项 可以使多个 不能为null */ public void hdel(String key, Object... item) { redisTemplate.opsForHash().delete(key, item); } /** * 获取hashKey对应的所有键值 * * @param key 键 * @return 对应的多个键值 */ public Map<Object, Object> hmget(String key) { return redisTemplate.opsForHash().entries(key); } /** * HashSet * * @param key 键 * @param map 对应多个键值 * @return true 成功 false 失败 */ public boolean hmset(String key, Map<String, Object> map) { try { redisTemplate.opsForHash().putAll(key, map); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * HashSet 并设置时间 * * @param key 键 * @param map 对应多个键值 * @param time 时间(秒) * @return true成功 false失败 */ public boolean hmset(String key, Map<String, Object> map, long time) { try { redisTemplate.opsForHash().putAll(key, map); if (time > 0) { expire(key, time); } return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 判断hash表中是否有该项的值 * * @param key 键 不能为null * @param item 项 不能为null * @return true 存在 false不存在 */ public boolean hHasKey(String key, String item) { return redisTemplate.opsForHash().hasKey(key, item); } /** * hash递增 如果不存在,就会创建一个 并把新增后的值返回 * * @param key 键 * @param item 项 * @param by 要增加几(大于0) * @return */ public double hincr(String key, String item, double by) { return redisTemplate.opsForHash().increment(key, item, by); } /** * hash递减 * * @param key 键 * @param item 项 * @param by 要减少记(小于0) * @return */ public double hdecr(String key, String item, double by) { return redisTemplate.opsForHash().increment(key, item, -by); } //============================set============================= /** * 根据key获取Set中的所有值 * * @param key 键 * @return */ public Set<Object> sGet(String key) { try { return redisTemplate.opsForSet().members(key); } catch (Exception e) { e.printStackTrace(); return null; } } /** * 根据value从一个set中查询,是否存在 * * @param key 键 * @param value 值 * @return true 存在 false不存在 */ public boolean sHasKey(String key, Object value) { try { return redisTemplate.opsForSet().isMember(key, value); } catch (Exception e) { e.printStackTrace(); return false; } } /** * 将数据放入set缓存 * * @param key 键 * @param values 值 可以是多个 * @return 成功个数 */ public long sSet(String key, Object... values) { try { return redisTemplate.opsForSet().add(key, values); } catch (Exception e) { e.printStackTrace(); return 0; } } /** * 将set数据放入缓存 * * @param key 键 * @param time 时间(秒) * @param values 值 可以是多个 * @return 成功个数 */ public long sSetAndTime(String key, long time, Object... values) { try { Long count = redisTemplate.opsForSet().add(key, values); if (time > 0) { expire(key, time); } return count; } catch (Exception e) { e.printStackTrace(); return 0; } } /** * 获取set缓存的长度 * * @param key 键 * @return */ public long sGetSetSize(String key) { try { return redisTemplate.opsForSet().size(key); } catch (Exception e) { e.printStackTrace(); return 0; } } /** * 移除值为value的 * * @param key 键 * @param values 值 可以是多个 * @return 移除的个数 */ public long setRemove(String key, Object... values) { try { Long count = redisTemplate.opsForSet().remove(key, values); return count; } catch (Exception e) { e.printStackTrace(); return 0; } } //===============================list================================= /** * 获取list缓存的内容 * * @param key 键 * @param start 开始 * @param end 结束 0 到 -1代表所有值 * @return */ public List<Object> lGet(String key, long start, long end) { try { return redisTemplate.opsForList().range(key, start, end); } catch (Exception e) { e.printStackTrace(); return null; } } /** * 获取list缓存的长度 * * @param key 键 * @return */ public long lGetListSize(String key) { try { return redisTemplate.opsForList().size(key); } catch (Exception e) { e.printStackTrace(); return 0; } } /** * 通过索引 获取list中的值 * * @param key 键 * @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推 * @return */ public Object lGetIndex(String key, long index) { try { return redisTemplate.opsForList().index(key, index); } catch (Exception e) { e.printStackTrace(); return null; } } /** * 将list放入缓存 * * @param key 键 * @param value 值 * @return */ public boolean lSet(String key, Object value) { try { redisTemplate.opsForList().rightPush(key, value); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 将list放入缓存 * * @param key 键 * @param value 值 * @param time 时间(秒) * @return */ public boolean lSet(String key, Object value, long time) { try { redisTemplate.opsForList().rightPush(key, value); if (time > 0) { expire(key, time); } return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 将list放入缓存 * * @param key 键 * @param value 值 * @return */ public boolean lSet(String key, List<Object> value) { try { redisTemplate.opsForList().rightPushAll(key, value); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 将list放入缓存 * * @param key 键 * @param value 值 * @param time 时间(秒) * @return */ public boolean lSet(String key, List<Object> value, long time) { try { redisTemplate.opsForList().rightPushAll(key, value); if (time > 0) { expire(key, time); } return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 根据索引修改list中的某条数据 * * @param key 键 * @param index 索引 * @param value 值 * @return */ public boolean lUpdateIndex(String key, long index, Object value) { try { redisTemplate.opsForList().set(key, index, value); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 移除N个值为value * * @param key 键 * @param count 移除多少个 * @param value 值 * @return 移除的个数 */ public long lRemove(String key, long count, Object value) { try { Long remove = redisTemplate.opsForList().remove(key, count, value); return remove; } catch (Exception e) { e.printStackTrace(); return 0; } } }
源码: https://github.com/oycyqr/springboot-learning-demo/tree/master/springboot-redis
到此这篇关于SpringBoot+Redis 实现分布式缓存的方法步骤的文章就介绍到这了,更多相关SpringBoot Redis 分布式缓存内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!