详解RabbitMQ延迟队列的基本使用和优化
作者:不断前进的皮卡丘
1.延迟队列基本介绍
一般队列中的元素总是希望能够早点被取出来进行处理,但是延迟队列中的元素则是希望可以在指定时间内被取出和处理,延迟队列中的元素都是带有时间属性的。延迟队列就是用来存放需要在指定时间被处理的元素的队列
延迟队列就是想要消息延迟一段时间后被处理,TTL可以让消息在延迟一段时间后变成死信。变成死信的消息都会被投递到死信队列中,这样的话,只要消费者一直消费死信队列里面的消息就可以了,因为里面的消息都是希望被马上处理的消息 生产者生产一条延时消息,根据需要延时时间的不同,通过不同的routing key把消息路由到不同的延迟队列,每一个队列都设置了不同的TTL属性,并且绑定在同一个死信交换机中,消息过期了以后,根据routing key的不同,又会被路由到不同的死信队列中,消费者只需要监听对应的死信队列进行处理就可以了。注意:不要造成重复消费
2.延迟队列使用场景
下面的场景需要使用延迟队列
- 订单在十分钟内没有支付就自动取消
- 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒
- 账单在一周内没有支付,就会自动结算
- 用户注册成功以后,如果三天内没有登录就进行短信题提醒
- 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
- 预定会议以后,需要提前十分钟通知各个参会人员参加会议。
3.Spring Boot集成RabbitMQ
3.1创建项目,引入依赖
相关依赖
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> </dependencies>
3.2application.properties配置文件
# RabbitMQ/配置 #服务器地址 spring.rabbitmq.host=服务器地址 #服务端口号 spring.rabbitmq.port=5672 #虚拟主机名称 spring.rabbitmq.virtual-host=/myhost #用户名 spring.rabbitmq.username=admin #密码 spring.rabbitmq.password=123456
3.3 队列TTL-代码结构图
3.4MQ配置类
package com.zyh.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * @author zengyihong * @create 2022--10--04 16:44 */ @Configuration public class TtlQueueConfiguration { //普通交换机 public static final String X_EXCHANGE = "X"; //普通队列 public static final String QUEUE_A = "QA"; public static final String QUEUE_B = "QB"; //死信交换机 public static final String Y_DEAD_LETTER_EXCHANGE = "Y"; //死信队列QD public static final String QUEUE_D = "QD"; /** * 声明普通交换机X * * @return */ @Bean public DirectExchange xExchange() { return new DirectExchange(X_EXCHANGE); } /** * 声明队列QA * * @return */ @Bean public Queue queueA() { //创建集合保存队列属性 Map<String, Object> map = new HashMap<>(); //设置该队列绑定的死信交换机名称 map.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //设置routing key map.put("x-dead-letter-routing-key", "YD"); //设置队列延迟时间 10秒 map.put("x-message-ttl", 10000); //创建队列 return QueueBuilder.durable(QUEUE_A).withArguments(map).build(); } /** * 把QA队列和交换机X进行绑定 * * @return */ @Bean public Binding queueA_BindingX(@Qualifier("queueA") Queue queue, @Qualifier("xExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("XA"); } /** * 声明队列QB * * @return */ @Bean public Queue queueB() { //创建集合保存队列属性 Map<String, Object> map = new HashMap<>(); //设置该队列绑定的死信交换机名称 map.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //设置routing key map.put("x-dead-letter-routing-key", "YD"); //设置队列延迟时间 10秒 map.put("x-message-ttl", 40000); //创建队列 return QueueBuilder.durable(QUEUE_A).withArguments(map).build(); } /** * 把QB队列和交换机X进行绑定 * * @return */ @Bean public Binding queueB_BindingX(@Qualifier("queueB") Queue queue, @Qualifier("xExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("XB"); } /** * 声明死信交换机Y * * @return */ @Bean public DirectExchange yExchange() { return new DirectExchange(Y_DEAD_LETTER_EXCHANGE); } /** * 声明死信队列QD * * @return */ @Bean public Queue queueD() { return new Queue(QUEUE_D); } /** * 把死信交换机和死信队列进行绑定 * @param queue * @param exchange * @return */ @Bean public Binding deadLetterBindingQD(@Qualifier("queueD") Queue queue, @Qualifier("yExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("YD"); } }
3.5生产者代码
@Slf4j @RestController @RequestMapping("/ttl") public class SendMessageController { @Resource private RabbitTemplate rabbitTemplate; /** * 生产者发送消息 * @param message */ @GetMapping("/sendMessage/{message}") public void sendMessage(@PathVariable String message){ //记录日志 log.info("当前时间:{},发送一条信息给两个TTL队列:{}",new Date(),message); //给QA队列发送消息 rabbitTemplate.convertSendAndReceive("X","XA", "消息来自TTL为10秒的队列:"+message); rabbitTemplate.convertSendAndReceive("X","XB", "消息来自TTL为40秒的队列:"+message); } }
3.6消费者代码
@Slf4j @Component public class DeadLetterQueueConsumer { @RabbitListener(queues = TtlQueueConfiguration.QUEUE_D) public void receiveQD(Message message, Channel channel){ //获取消息 String msg=new String(message.getBody()); log.info("当前时间:{},收到死信队列消息:{}",new Date(),msg); } }
3.7测试
启动boot项目,在浏览器输入localhost:8080/ttl/sendMessage/Hello
但是这种方式有一种缺点,现在我们只有TTL为10s和40s的延迟队列,如果我们需要其他延时时间的队列的话,那么我们又得新增其他队列,这样其实并不方便,我们想要的是能够动态设置TTL,这样就不需要为每个TTL设置新的延迟队列了。
4.延迟队列优化
4.1代码结构图
4.2配置类
在之前写的代码基础上新增一个配置类
package com.zyh.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * @author zengyihong * @create 2022--10--05 10:44 */ @Configuration public class MessageTtlQueueConfiguration { //死信交换机 public static final String Y_DEAD_LETTER_EXCHANGE = "Y"; //普通队列 public static final String QUEUE_C = "QC"; /** * 声明QC队列 * @return */ @Bean public Queue queueC(){ //创建集合保存队列属性 Map<String, Object> map = new HashMap<>(); //设置该队列绑定的死信交换机名称 map.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //设置routing key map.put("x-dead-letter-routing-key", "YD"); //设置队列延迟时间 10秒 map.put("x-message-ttl", 10000); return QueueBuilder.durable(QUEUE_C).withArguments(map).build(); } /** * 把QC队列和正常交换机X进行绑定 * * @return */ @Bean public Binding queueC_BindingX(@Qualifier("queueC") Queue queue, @Qualifier("xExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("XC"); } }
4.3生产者
package com.zyh.controller; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import javax.annotation.Resources; import java.util.Date; /** * @author zengyihong * @create 2022--10--04 19:36 */ @Slf4j @RestController @RequestMapping("/ttl") public class SendMessageController { @Resource private RabbitTemplate rabbitTemplate; /** * 生产者发送消息 * * @param message */ @GetMapping("/sendMessage/{message}") public void sendMessage(@PathVariable String message) { //记录日志 log.info("当前时间:{},发送一条信息给两个TTL队列:{}", new Date(), message); //给QA队列发送消息 rabbitTemplate.convertSendAndReceive("X", "XA", "消息来自TTL为10秒的队列:" + message); rabbitTemplate.convertSendAndReceive("X", "XB", "消息来自TTL为40秒的队列:" + message); } /** * 生产者发送消息(动态设置有效期) * * @param message */ @GetMapping("/sendMessage/{message}/{ttlTime}") public void sendMessage(@PathVariable String message, @PathVariable String ttlTime) { MessagePostProcessor messagePostProcessor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //设置消息有效期 message.getMessageProperties().setExpiration(ttlTime); return message; } }; //记录日志 log.info("当前时间:{},发送一条时长{}毫秒信息给队列QC:{}", new Date(),ttlTime, message); //给QC队列发送消息 rabbitTemplate.convertAndSend("X", "XC", message, messagePostProcessor); } }
4.4消费者
@Slf4j @Component public class DeadLetterQueueConsumer { @RabbitListener(queues = TtlQueueConfiguration.QUEUE_D) public void receiveQD(Message message, Channel channel){ //获取消息 String msg=new String(message.getBody()); log.info("当前时间:{},收到死信队列消息:{}",new Date(),msg); } }
4.5测试
启动boot项目
在浏览器输入
http://localhost:8080/ttl/sendMessage/Hello/20000
http://localhost:8080/ttl/sendMessage/你好/2000
如果在消息属性上设置TTL的方式,那么消息可能不会按时死亡,因为RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行
到此这篇关于详解RabbitMQ延迟队列的基本使用和优化的文章就介绍到这了,更多相关RabbitMQ延迟队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!