Redis

关注公众号 jb51net

关闭
首页 > 数据库 > Redis > Redis 延迟队列

Redis实现延迟队列的项目示例

作者:yifanghub

延迟队列是Redis的一个重要应用场景,本文主要介绍了Redis实现延迟队列的项目示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

最近用到一个延迟消息的功能,第一时间想到使用MQ或者MQ的插件,因为数据量不大,所以尝试使用Redis来实现了,毕竟Redis也天生支持类似MQ的队列消费,所以,在这里总结了一下Redis实现延迟消息队列的方式。

一、监听key过期时间

处理流程:当redis的一个key过期时,redis会生成一个事件,通知订阅了该事件的客户端(KeyExpirationEventMessageListener),然后在客户端的回调方法中处理逻辑。
1)新建SpringBoot项目,maven依赖及yml如下
maven依赖:

<dependencies>
     <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-web</artifactId>
     </dependency>
     <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-data-redis</artifactId>
     </dependency>

     <dependency>
         <groupId>org.projectlombok</groupId>
         <artifactId>lombok</artifactId>
     </dependency>

 </dependencies>

yml文件

server:
  port: 8000

spring:
  redis:
    database: 0
    host: xxxx
    port: 6379
    password: xxxxxx
    lettuce:
      pool:
        #最大连接数
        max-active: 8
        #最大阻塞等待时间
        max-wait: -1
        #最大空闲
        max-idle: 8
        #最小空闲
        min-idle: 0
    #连接超时时间
    timeout: 5000

2)修改redis.conf文件开启事件通知配置
默认的配置:notify-keyspace-events “”
修改为:notify-keyspace-events Ex,该配置表示监听key的过期事件

3)设置Redis监听配置,注入Bean RedisMessageListenerContaine

@Configuration
public class RedisTimeoutConfiguration {

    @Autowired
    private RedisConnectionFactory redisConnectionFactory;

    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer() {
        RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
        redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
        return redisMessageListenerContainer;
    }

    @Bean
    public KeyExpiredListener keyExpiredListener() {
        return new KeyExpiredListener(this.redisMessageListenerContainer());
    }
}

4)创建监听器类,重写key过期回调方法onMessage

@Slf4j
public class KeyExpiredListener extends KeyExpirationEventMessageListener {

    @Autowired
    public RedisTemplate<String, String> redisTemplate;

    public KeyExpiredListener(RedisMessageListenerContainer listenerContainer) {
        super(listenerContainer);
    }

    @Override
    public void onMessage(Message message, byte[] bytes) {
        String channel = new String(message.getChannel(), StandardCharsets.UTF_8);
        //过期的key
        String key = new String(message.getBody(), StandardCharsets.UTF_8);
        log.info("redis key 过期:bytes={},channel={},key={}", new String(bytes), channel, key);
    }
}

5)编写测试接口:写入一个带过期时间的key

@RestController
@RequestMapping("/demo")
public class BasicController {

    @Autowired
    public RedisTemplate<String, String> redisTemplate;

    @GetMapping(value = "/test")
    public void redisTest() {
        redisTemplate.opsForValue().set("test", "5s后过期", 5, TimeUnit.SECONDS);
    }
}

执行后,onMessage监听方法打印结果:

 redis key 过期:bytes=__keyevent@*__:expired,channel=__keyevent@0__:expired,key=test

该方案缺点:可靠性问题,Redis 是一个内存数据库,尽管它提供了数据持久化选项(如 RDB 和 AOF),但在某些情况下(如意外崩溃或重启),可能会丢失一些未处理的过期事件。

二、zset + score

基本思路是将消息按需发送的时间作为分数存储在有序集合zset中,然后定期检查并处理到期的消息。代码例子如下:
1)创建 DelayedMessageService 类

@Slf4j
@Service
public class DelayedMessageService {

    private static final String DELAYED_MESSAGES_ZSET = "delayed:messages";


    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    public void addMessage(String message, long delayMillis) {
        long score = System.currentTimeMillis() + delayMillis;
        redisTemplate.opsForZSet().add(DELAYED_MESSAGES_ZSET, message, score);
    }


    @Scheduled(fixedRate = 1000)
    public void processMessages() {
        long now = System.currentTimeMillis();
        Set<ZSetOperations.TypedTuple<String>> messages = redisTemplate.opsForZSet().rangeByScoreWithScores(DELAYED_MESSAGES_ZSET, 0, now);
        if (messages != null && !messages.isEmpty()) {
            for (ZSetOperations.TypedTuple<String> message : messages) {
                String msg = message.getValue();
                long score = message.getScore().longValue();
                if (score <= now) {
                    // Process the message
                    System.out.println("Processing message: " + msg);
                    // Remove the message from the zset
                    redisTemplate.opsForZSet().remove(DELAYED_MESSAGES_ZSET, msg);
                }
            }
        }else{
            log.info("定时任务执行~");
        }
    }


}

2)编写Controller接口测试,初始化zset内容

@RestController
@RequestMapping("/demo")
public class BasicController {

    @Autowired
    private DelayedMessageService delayedMessageService;

    @GetMapping(value = "/test2")
    public void redisZsetTest() {
        // Add some messages with delays
        delayedMessageService.addMessage("Message 1", 5000); // 5 seconds delay
        delayedMessageService.addMessage("Message 2", 10000); // 10 seconds delay
        delayedMessageService.addMessage("Message 3", 15000); // 15 seconds delay
    }
}

说明:

三、Redisson框架

利用 Redisson 提供的数据结构RDelayedQueueRBlockingDeque,可以自动处理过期的任务并将它们移动到阻塞队列中,这样我们就可以从阻塞队列中获取任务并进行消费处理。例子如下:
1)添加依赖

<!-- Redisson 依赖项 -->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>2.15.1</version>
</dependency>

2)创建DelayedMessageService

@Slf4j
@Service
public class DelayedMessageService {

    @Autowired
    private RedissonClient redissonClient;

    private RBlockingDeque<String> blockingDeque;
    private RDelayedQueue<String> delayedQueue;

    @PostConstruct
    public void init() {
        this.blockingDeque = redissonClient.getBlockingDeque("delayedQueue");
        this.delayedQueue = redissonClient.getDelayedQueue(blockingDeque);

        Executors.newSingleThreadExecutor().submit(this::processMessages);
    }

    public void addMessage(String message, long delayMillis) {
        delayedQueue.offer(message, delayMillis, TimeUnit.MILLISECONDS);
    }

    public void processMessages() {
        try {
            while (true) {
                String message = blockingDeque.take();
                // Process the message
                log.info("消息被处理: " + message);
                // ..业务逻辑处理
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.error("中断异常",e);
        }
    }


}

3)测试接口

@GetMapping(value = "/test3")
    public void redisQueueTest() {
        // Add some messages with delays
        delayedMessageService.addMessage("Message 1", 5000); // 5 seconds delay
        delayedMessageService.addMessage("Message 2", 10000); // 10 seconds delay
        delayedMessageService.addMessage("Message 3", 15000); // 15 seconds delay
    }

说明:

RDelayedQueue 是 Redisson 提供的延迟队列,它将消息存储在指定的队列中,直到消息到期才会被转移到该队列。它的主要作用包括:

RBlockingQueue是 Redisson 提供的阻塞队列,它支持阻塞操作。主要作用包括:

总结
个人推荐使用Redisson 的RDelayedQueue 方式,感觉更加可靠和简单一些,当然zset+score也可以是个不错选择,毕竟更加灵活,延迟消息还有其他不同的方案,比如rocketmq、rabbitmq插件等,假如项目中用了redis,又不想引入更多的中间件,可以尝试使用redis来实现,为了测试,这里例子都比较简单,在实际使用过程中,还要考虑补偿机制、幂等性等问题。

参考:

1.https://blog.csdn.net/qq_34826261/article/details/120598731

到此这篇关于Redis实现延迟队列的项目示例的文章就介绍到这了,更多相关Redis 延迟队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文