java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Guava本地缓存的使用

Guava本地缓存的使用过程

作者:肥肥肥柯

文章介绍了使用Guava和Redis实现二级缓存的原因,以及如何通过Guava作为一级缓存,Redis作为二级缓存来减少数据库压力,提高缓存的可靠性,同时,通过一个具体示例说明了如何在微服务场景中使用Guava和Redis进行二级缓存,最后,总结了Guava的参数机制

Guava和Redis实现二级缓存

1、目的

本地缓存为什么不使用hashMap或者concurrentHashMap

concurrentHahMap和hashMap一样,都是长期存在的缓存,除非调用remove方法,否则缓存中的数据无法主动释放

仅使用Guava本地缓存会有什么问题?

作为API或者某种功能系统来用的话,无论单机/集群(集群其实就形成了近乎Guava副本的情况),Guava中的数据增长到后期不可估量的时候,Guava是支撑不住的;而微服务情况下没法全局缓存,如果数据量无限增长、不可控的话还是不建议使用。

仅使用Redis缓存会有什么问题?

大数量的情况下(热搜)容易引发缓存雪崩进而导致服务器雪崩。

综上,结合Guava、Redis,Guava作为一级缓存,Redis作为二级缓存,可以在减少数据库压力的基础上,将“缓存”这道防线做的更加可靠。 

2、二级缓存场景示例

公司有一款摄像头,放在了我家经常无人居住的豪宅了,摄像头包括异常人像报警、断电报警、信号异常报警、捕获画面动态报警等等多种报警功能类型(跳过其他设定,规定同类型的报警间隔5秒内仍存在则继续报警)。

现在有需求:我可以在平台上配置我想要报警的报警类型(不然我哪天周末回豪宅了它还一直报警到平台打扰我休息),当有我报警信息过来并且是匹配我配置的报警信息时,这个这条报警将推送到我平台首页。

//这里忽略报警系统代码,报警系统推送报警消息是通过RocketMQ实现
topic: alarm-camera
@Configuration
public class RocketMqConsumer {
    private static Logger logger = LogManager.getLogger(RocketMqConsumer.class);


    public void init() {
        pullAlarm();
        logger.warn("rocketmq拉取告警数据成功!");
    }

    /**
     * pullAlarm:拉取告警源数据。
     * @author liaokh
     * @since JDK 1.8
     */
    public static void pullAlarm() {
        new Thread() {
            public void run() {
                logger.warn("---------开始消费报警broker---------");
                try {
                    // 声明并初始化一个consumer
                    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rocketmq-consumer-dev-camera" + "-alarm");

                    // 同样也要设置NameServer地址
                    consumer.setNamesrvAddr("我的RocketMQ服务器地址");

                    // 广播模式 当 Consumer 使用广播模式时,每条消息都会被 Consumer 集群内所有的 Consumer 实例消费一次。
                    consumer.setMessageModel(MessageModel.BROADCASTING);

                    // 这里设置的是一个consumer的消费策略
                    // CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,即跳过历史消息
                    // CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
                    // CONSUME_FROM_TIMESTAMP 从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前
                    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

                    // 设置consumer所订阅的Topic和Tag,*代表全部的Tag
                    consumer.subscribe("alarm-camera", "*");

                    // 设置一个Listener,主要进行消息的逻辑处理
                    consumer.registerMessageListener(new MessageListenerConcurrently() {

                        @Override
                        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                                        ConsumeConcurrentlyContext context) {

                            for (MessageExt msg : msgs) {
                                try {
                                    String tag = msg.getTags();
                                    String alarmJson = new String(msg.getBody());
                                    logger.warn("收到alarm-camera数据:tag:" + tag + " alarmJson:" + alarmJson);
                                    CameraAlarmResp resultAlarm = new CameraAlarmResp();
                                    AlarmMQResp alarm = JSON.parseObject(alarmJson, AlarmMQResp.class);
                                    //查看当前告警类型是否在该用户配置的列表中
                                    //根据摄像头设备号获取用户信息
                                    Camera cameraEntity = Utils.getCameraById(alarm.getCameraId());  //这种核心数据也可以加载到缓存中
                                    UserAlarm userAlarm = Utils.getUserAlarm(cameraEntity.getUserId());
                                    if (userAlarm == null || StringUtils.isBlank(userAlarm.getAlarmIds())){
                                        logger.error("设备号" + alarm.getId() + "的用户未配置需要推送的告警类型");
                                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                                    }
                                    boolean isReturn = true;
                                    //获取该用户告警列表过滤
                                    String[] userAlarmArr = userAlarm.getAlarmIds().split(",");
                                    for (String s : userAlarmArr) {
                                        if (alarm.getAlarmType().equals(s)){  //说明需要推送
                                            isReturn = false;
                                        }
                                    }
                                    if (isReturn){
                                        //匹配则该告警不需要推送,直接消费成功
                                        logger.warn("该设备号的用户未配置需要推送的告警类型");
                                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                                    }

                                    WebSocket webSocket = SpringUtil.getBean(WebSocket.class);
                                    //创建业务消息信息
                                    JSONObject obj = new JSONObject();
                                    obj.put("cmd", "alarm");//业务类型
                                    obj.put("msgId", msg.getMsgId());//消息id
                                    obj.put("msgTxt", JSON.toJSONString(alarm));//消息内容
                                    //单个用户发送
                                    webSocket.sendOneMessage(alarm.getUserId(), obj.toJSONString());
                                } catch (Exception e) {
                                    logger.error("请求异常", e);
                                }
                            }
                            // 返回消费状态,消费成功
                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                        }
                    });

                    // 调用start()方法启动consumer
                    consumer.start();
                    logger.warn("rocketmq消费者创建成功");
                } catch (Exception e) {
                    logger.error("请求异常", e);
                }
            }
        }.start();
    }
}

该消费者将消费从报警系统推送过来的报警信息,如果符合用户配置的报警类型,就通过WebSocket(这里只需要知道websocket是用来和前端建立长连接的,如果需要详细了解其意义和使用请参考相关文章)推送到前端。

其实上面示例有提到,同种类型告警5s内如果仍然有报警将会5s后推送到平台,因此,为了避免每条MQ过来的时候,都去数据库查一次配置表,可能一个半个的用户报警消息算起来很少,但是如果这个摄像头大卖,涉及到大规模用户时,这种MQ将会变得特别多,每次MQ推送报警过来时候都要去判断是否推送,规模大了这个查库过程就显得特别low。

索性将这种配置数据存到缓存中,至于仅使用Guava或者仅使用Redis或者像本文一样结合使用,又或者根据项目发展递进使用,就取决于你自己了。 

@Component
public class Utils {
	private static final Logger logger = LoggerFactory.getLogger(GuavaCacheUtils.class);
	
	/**
	* 获取用户告警配置信息
	*/
    public static UserAlarm getUserAlarm(String userId){
		if(StringUtils.isBlank(userId)){
			return null;
		}
		UserAlarm userAlarm = null;
		try {
			userAlarm = GuavaCacheUtils.userAlarmCache.get(userId).orNull();
			if(null == userAlarm){
				GuavaCacheUtils.userAlarmCache.invalidate(userId);  //清除Guava的缓存
                //尝试从Redis中获取
                String userAlarmJson = RedisUtils.hget("alarm_camera", userId);
				userAlarm = JSON.parseObject(userAlarmJson, UserAlarm.class);
			}
		} catch (ExecutionException e) {
			logger.error("获取用户配置缓存异常",e);
		}
		return userAlarm;
	}

	/**
	* 获取摄像头信息
	*/
	public static Camera getCameraById(String cameraId){
		Camera camera= null;
		try {
			camera= GuavaCacheUtils.cameraCache.get(cameraId).orNull();
		} catch (ExecutionException e) {
			logger.error("获取设备数据异常异常",e);
		}
		return device;
	}
}
/**
 * ClassName:GuavaCacheUtils <br/>
 * @version
 * @since JDK 1.8
 * @see java(jvm)缓存存储
 */
@Component
public class GuavaCacheUtils {
   private static final Logger logger = LoggerFactory.getLogger(GuavaCacheUtils.class);
	 /**
	 * 用户告警推送列表缓存
	 *
	 * expireAfterWrite:10分钟内没有更新将被回收重新获取
	 *
	 * load:获取缓存为空时执行(去数据库查询并将结果放入缓存)
	 */
	public static LoadingCache<String, Optional<UserAlarm>> userAlarmCache = CacheBuilder.newBuilder()
			.expireAfterAccess(10, TimeUnit.MINUTES).build(new CacheLoader<String, Optional<UserAlarm>>() {
				@Override
				public Optional<UserAlarm> load(String userId) throws Exception {
                    UserAlarm userAlarm = SpringUtil.getBean(UserAlarmService.class)
                        .getOne(new LambdaQueryWrapper<UserAlarm>()
									.eq(UserAlarm::getUserId,userId));
					return Optional.fromNullable(userAlarm);
				}
			});

	/**
	 * 摄像头设备信息缓存
	 */
	public static LoadingCache<String, Optional<Camera>> cameraCache = CacheBuilder.newBuilder()
			.expireAfterAccess(10, TimeUnit.MINUTES)
			.build(new CacheLoader<String, Optional<Camera>>() {
				@Override
				public Optional<Camera> load(String cameraId) throws Exception {
					String cameraJson = RedisUtils.hget("camera", cameraId);
					Cameracamera= JSON.parseObject(cameraJson, Camera.class);
					return Optional.fromNullable(camera);
				}
			});
}
@Service
public class UserAlarmServiceImpl extends ServiceImpl<UserAlarmMapper, UserAlarm> implements UserAlarmService{
    //新增用户告警配置
    @Override
    public String insert(UserAlarm userAlarm){
        try{
            this.save(userAlarm);
            //随即存入Redis
            RedisUtil.hset("alarm_camera",userAlarm.getUserId,userAlarm);
        } catch (Exception e) {
            return "失败啦";
        }
        return "成功咯";
    }
    
    //修改用户告警配置
    @Override
    public String update(UserAlarm userAlarm){
        try{
            UpdateWrapper<UserAlarm> wrapper = new UpdateWrapper();
            wrapper.set("alarmType",userAlarm.getAlarmType());
                .eq("user_id",userAlarm.getUserId);
            this.save(userAlarm);
            //随即更新Redis
            RedisUtil.hset("alarm_camera",userAlarm.getUserId,userAlarm);
        } catch (Exception e) {
            return "失败啦";
        }
        return "成功咯";
    }
}

3、Guava参数机制

#回收机制

#刷新机制

/**
 * ClassName:GuavaCacheUtils <br/>
 * @version
 * @since JDK 1.8
 * @see java(jvm)缓存存储
 */
@Component
public class GuavaCacheUtils {
   private static final Logger logger = LoggerFactory.getLogger(GuavaCacheUtils.class);

   /**
    * LoadingCache登录缓存
    * 链式调用
    * removalListener:设置缓存被移除后的监听任务
    * build:构建对象
    */
   public static LoadingCache<String, Optional<User>> loginCache = CacheBuilder.newBuilder()
         .expireAfterAccess(720, TimeUnit.MINUTES).removalListener(new MyRemovalListener())
         .build(new CacheLoader<String, Optional<User>>() {
            @Override
            public Optional<User> load(String token) throws Exception {
               User user = null;
               try {
                   //到redis中匹配
                  String loginJson = RedisUtils.get(token);
                  user = JSON.parseObject(loginJson, User.class);
               } catch (Exception e) {
                  logger.error("登录缓存查询异常", e);
               }
               return Optional.fromNullable(user);
            }
         });

    /**
    * MyRemovalListener自定义缓存移除监听器,需要实现RemovalListener接口并实现RemovalListener<K,V>接口,K,V为key和value的泛型
    * Optional:主要用于解决空指针异常,简洁判空
    * notification.getCause():监听到的缓存失效原因
    */
   private static class MyRemovalListener implements RemovalListener<String, Optional<User>> {
      @Override
      public void onRemoval(RemovalNotification<String, Optional<User>> notification) {
         if (notification.getCause().toString().equals("EXPIRED")) {
            String token = notification.getKey();
            RedisUtils.del(0,token);
         }
      }
   }
}

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文