SpringBoot使用Redis实现消息队列的方法小结
作者:springdoc.cn
使用 Redis 实现消息队列的几种方式
Redis 提供了多种方式来实现消息队列。
Pub/Sub
订阅发布模式,发布者把消息发布到某个 Channel,该 Channel 的所有订阅者都会收到消息。但是这种方式最大的问题是 「发布出去的消息,如果没有被监听消费,或者消费过程中宕机,那么消息就会永远丢失」。适合用于临时通知之类的场景,对于需要保证数据不丢失的场景不能使用这种方式。
List
List 是 Redis 提供的一种数据类型,底层是链表,可以用来实现队列、栈。
Stream
Stream 是一个由 Redis 5 引入的,功能完善的消息队列。想必也是 Redis 官方团队看到太多人拿 Redis 当消息队列使,于是干脆就在 Redis 上设计出一个类似于 Kafka 的消息队列。
Steam 支持消费组消费,一条消息只能被消费组中的其中一个消费者消费。支持 「消息确认」、支持 「回溯消费」 还支持把未 ACK(确认)的消息转移给其他消费者进行重新消费,在进行转移的时候还会累计消息的转移次数,当次数达到一定阈值还没消费成功,就可以放入死信队列。
这也是 Redis 种最复杂的一种数据类型。如果你真的到了需要使用 Redis Steam 作为消息队列的地步,那不如直接使用 RabbitMQ 等更加成熟且稳定的消息队列系统。
使用 List 实现可靠的消息队列
目前来说,这是用得最多的一种方式,适用于大多数简单的消息队列应用场景。List 类型有很多指令,但是作为消息队列来说用到的只有几个个:
「LPUSH key element [element ...]
」
把元素插入到 List 的首部,如果 List 不存在,会自动创建。
「BRPOPLPUSH source destination timeout
」
移除并且返回 List (source)尾部的最后一个元素,并且同时会把这个元素插入到另一个 List (destination)的首部。
当 source List 中没有元素时,Redis 会阻塞连接,直到有其他客户端向其推送元素或超时。超时时间(秒)为 0 表示永远不超时。
注意,这个命令是 「原子性」 的,也就是说只要客户端获取到了返回的元素,那么这个元素一定就会在 destination List 有备份。这是实现可靠消息队列的关键!
「RPOPLPUSH source destination
」
同上,它是 BRPOPLPUSH
命令的 「非阻塞」 版,如果 List 中没有元素就会立即返回 null
。
「LREM key count element
」
从 List 中删除元素,count 的值不同,删除的方式也不同:
count > 0:从头到尾开始搜索,删除与 element 相等的元素,最多删除 count 个。
count < 0:从尾到头开始搜索,删除与 element 相等的元素,最多删除 count (绝对值)个。
count = 0:删除所有与元素相等的元素。
「
BLMOVE
和LMOVE
命令」
LMOVE source destination <LEFT | RIGHT> <LEFT | RIGHT>
BLMOVE source destination <LEFT | RIGHT> <LEFT | RIGHT> timeout
从 Redis 6.2.0 开始,
BRPOPLPUSH
和RPOPLPUSH
命令就被声明为废弃了,取而代之的是语义更加明确的BLMOVE
和LMOVE
命令。❞
BLMOVE
和LMOVE
可以通过参数指定元素出队列(source)的方向,和入队列(destination)的方向,除此以外并无其他区别。
实现思路
了解了上述几个命令后,一个简单易用且可靠的消息队列就呼之欲出了。
生产者使用
LPUSH
命令往消息队列生产消息消费者使用
BRPOPLPUSH
命令从队列消费消息,并且还会在获取并返回消息的时候把该消息推送到另一个消息队列,也就是 Pending 队列,这个队列中存储的就是未被消费者 ACK 的消息消费者成功消费完毕后,使用
LREM
命令从 Pending 队列中删除这条消息,整个消费过程结束如果消费者在消费过程中出现异常、宕机,那么需要在恢复后从 Pending 队列中获取到这条消息,再进行重新消费,从而保证了消息队列的可靠性,不会丢失消息(可能存在重复消费,需要做好幂等处理)
在 Spring Boot 中实现
首先,创建 Spring Boot 项目,并整合 Redis。
创建一个 OrderConsumer
Bean 模拟从队列中消费订单 ID。
package cn.springdoc.demo.consumer; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; @Component public class OrderConsumer implements ApplicationRunner, Runnable { static final Logger log = LoggerFactory.getLogger(OrderConsumer.class); // 消息队列 final String queue = "queue_orders"; // pending 队列,即待确认消息的队列 final String pendingQueue = "pending_queue_orders"; @Autowired StringRedisTemplate stringRedisTemplate; @Override public void run(ApplicationArguments args) throws Exception { // 应用启动后,创建新的线程来执行消费任务 Thread thread = new Thread(this); thread.setName("order-consumer-thread"); thread.start(); } @Override public void run() { while (true) { try { // 1:消费者,从队列未弹出消息,并推送到 pending 队列,整个过程是原子性的 // 最多阻塞 5 秒,超过 5 秒后还没有消息,则返回 null String item = stringRedisTemplate.opsForList().rightPopAndLeftPush(queue, pendingQueue, 5, TimeUnit.SECONDS); if (item == null) { log.info("等待消息 ..."); continue ; } try { // 2:解析为 Long Long orderId = Long.parseLong(item); // 模拟消息消费 log.info("消费消息: {}", orderId); } catch (Exception e) { log.error("消费异常:{}", e.getMessage()); continue; } // 3:消费成功,从 pending 队列删除记录,相当于确认消费 stringRedisTemplate.opsForList().remove(pendingQueue, 0, item); } catch (Exception e) { log.error("队列监听异常:{}", e.getMessage()); break; } } log.info("退出消费"); } }
OrderConsumer
实现了 ApplicationRunner
接口,在应用就绪后创建新的消费线程进行消费。
stringRedisTemplate.opsForList().rightPopAndLeftPush
方法从 queue
队列消费一条消息,同时把消息添加到 pendingQueue
队列。该方法底层调用的正是 brpoplpush
命令,最多阻塞 5 秒,超时后返回 null
。
得到消息后解析为 Long
类型,模拟消费,即输出到日志。如果消费成功,则调用 stringRedisTemplate.opsForList().remove
方法(底层正是 LREM
命令)从 pendingQueue
队列中删除消息。如果消费失败,失败的消息会在 pendingQueue
队列中继续存在,不会丢失,可以重新投递消费或者是人工处理。
测试
启动应用后,通过 Redis 客户端往 queue_orders
队列推送消息:
> lpush queue_orders 10000 "1" > lpush queue_orders 10010 "1" > lpush queue_orders 10011 "1" > lpush queue_orders Nan "1"
往 queue_orders
队列推送了四条订单的 ID。注意最后一条消息值是 Nan
,这会导致 Long.parseLong
异常从而导致消费失败。
服务端输出日志如下:
[ main] cn.springdoc.demo.DemoApplication : Started DemoApplication in 3.769 seconds (process running for 4.18) [consumer-thread] c.springdoc.demo.consumer.OrderConsumer : 等待消息 ... [consumer-thread] c.springdoc.demo.consumer.OrderConsumer : 消费消息: 10000 [consumer-thread] c.springdoc.demo.consumer.OrderConsumer : 等待消息 ... [consumer-thread] c.springdoc.demo.consumer.OrderConsumer : 消费消息: 10010 [consumer-thread] c.springdoc.demo.consumer.OrderConsumer : 等待消息 ... [consumer-thread] c.springdoc.demo.consumer.OrderConsumer : 消费消息: 10011 [consumer-thread] c.springdoc.demo.consumer.OrderConsumer : 消费异常:For input string: "Nan" [consumer-thread] c.springdoc.demo.consumer.OrderConsumer : 等待消息 ...
符合预期,前面三条消息都成功消费,最后一条消息消费失败。按照设计,这条消费失败的消息应该在 Pending 队列 pending_queue_orders
中存在。且应该只有这一条消息,因为其他三条消息都消费成功。
查看 pending_queue_orders
队列中的所有元素:
> lrange pending_queue_orders 0 -1 1) "Nan"
一切 OK,该队列中只有 Nan
这条消息,正是消费失败的那条消息。
此时,你如果想查看一下 Redis 中的所有 key,你会发现只有 pending_queue_orders
队列存在:
> keys * 1) "pending_queue_orders"
queue_orders
队列呢?这是 Redis List
的一个特性,当从 List
中弹出最后一个元素后,Redis 就会删除这个 List
。queue_orders
中的元素都被弹出了,所以它被删除了。当再次尝试往 queue_orders
中压入消息时,它会自动创建。也就是说 「我们不需要手动预先创建队列, Redis 会自己创建,也会在合适的时间删除,而这一切都是线程安全的」。
由于这是线程安全的,所以队列中的 「一条消息只能被一个消费者(客户端)进行消费」,这非常适合在分布式或者是集群模式下使用,不必担心同一条消息被多个消费者消费到。
注意,Pending 队列中的消息可能存在重复消费的可能。例如,消费者成功消费消息后,在调用 remove
方法从 Pending 队列中删除消息时失败,那么 Pending 队列中的这条删除失败的消息其实已经是被成功消费了的,需要在业务中考虑到!
使用 BLMOVE 和 LMOVE 命令
上文说过,从 Redis 6.2.0 开始 BRPOPLPUSH
和 RPOPLPUSH
命令就被声明为废弃了,后续版本中推荐使用 BLMOVE
和 LMOVE
命令。
目前 StringRedisTemplate
(Spring Boot 3.2.2)并未直接提供与 BLMOVE
和 LMOVE
命令对应的 API 方法,但是可以获取到底层连接对象来调用 BLMOVE
和 LMOVE
命令。
String item = this.stringRedisTemplate.execute(new RedisCallback<String>() { @Override public String doInRedis(RedisConnection connection) throws DataAccessException { // 调用 bLMove 命令 byte[] ret = connection.listCommands().bLMove(queue.getBytes(), pendingQueue.getBytes(), Direction.RIGHT, Direction.LEFT, 5); return ret == null ? null : new String(ret); } });
Redis 的持久化方式
Redis 是一个内存数据库,为了保证数据的安全不丢失,它提供了两种数据备份(持久化)方式,即 「RDB」 和 「AOF」。
「RDB」:生成某一时刻的数据快照,通过子进程进行备份,数据可能不完整(取决于备份周期)。
「AOF」:通过记录执行的指令到文件来实现数据备份,相对完整性较高,但是会记录每一条执行命令,性能会有一定影响。
这就需要根据你的业务场景来选择合适的持久化方式,也可以同时配合使用 「RDB」 和 「AOF」 两种方式,兼顾性能和数据安全。
总结
本文介绍了如何在 Spring Boot 中使用 Redis List 的 BRPOPLPUSH
/BLMOVE
命令来实现一个线程安全且可靠的消息队列。
以上就是SpringBoot使用Redis实现消息队列的方法小结的详细内容,更多关于SpringBoot Redis消息队列的资料请关注脚本之家其它相关文章!