RabbitMQ中的延迟队列机制详解
作者:煎丶包
一、延迟队列
延时队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望 在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。
二、队列TTL
创建一个配置类,声明并配置交换机和队列
@Configuration public class TtlQueueConfig { //普通交换机名称 public static final String NORMAL_EXCHANGE = "X"; //死信交换机名称 public static final String DEAD_EXCHANGE = "Y"; //普通队列名称 public static final String NORMAL_QUEUE_A = "QA"; public static final String NORMAL_QUEUE_B = "QA"; //死信队列名称 public static final String DEAD_QUEUE = "QD"; //声明普通交换机 @Bean("xExchange") public DirectExchange xExchange() { return new DirectExchange(NORMAL_EXCHANGE); } //声明死信交换机 @Bean("yExchange") public DirectExchange yExchange() { return new DirectExchange(DEAD_EXCHANGE); } //声明普通队列,TTL为10s @Bean("QA") public Queue qA() { Map<String, Object> arguments = new HashMap<>(); //设置死信交换机 arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE); //设置死信RoutingKey arguments.put("x-dead-letter-routing-key", "YD"); //设置TTL arguments.put("x-message-ttl", 10000); return QueueBuilder.durable(NORMAL_QUEUE_A).withArguments(arguments).build(); } //声明普通队列,TTL为10s @Bean("QB") public Queue qB() { Map<String, Object> arguments = new HashMap<>(); //设置死信交换机 arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE); //设置死信RoutingKey arguments.put("x-dead-letter-routing-key", "YD"); //设置TTL arguments.put("x-message-ttl", 40000); return QueueBuilder.durable(NORMAL_QUEUE_B).withArguments(arguments).build(); } //声明死信队列 @Bean("QD") public Queue qD() { return QueueBuilder.durable(DEAD_QUEUE).build(); } //绑定对应的交换机和队列 @Bean public Binding queueABindingX(@Qualifier("QA") Queue QA, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(QA).to(xExchange).with("XA"); } @Bean public Binding queueBBindingX(@Qualifier("QB") Queue QB, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(QB).to(xExchange).with("XB"); } @Bean public Binding queueDBindingY(@Qualifier("QD") Queue QD, @Qualifier("yExchange") DirectExchange yExchange) { return BindingBuilder.bind(QD).to(yExchange).with("YD"); } }
创建一个生产者
@Slf4j @RestController public class SendMsgController { @Autowired RabbitTemplate rabbitTemplate; @GetMapping("/ttl/sendMsg/{message}") public void sendMsg(@PathVariable String message) { log.info("当前时间:{}, 发送一条信息给两个队列:{}", new Date().toString(), message); rabbitTemplate.convertAndSend("X","XA","消息来自TTL为10s的队列QA:" + message); rabbitTemplate.convertAndSend("X","XB","消息来自TTL为40s的队列QB:" + message); } }
创建一个消费者
@Slf4j @Component public class DeadLetterQueueConsumer { //接收消息 @RabbitListener(queues = "QD") public void receivedQD(Message message, Channel channel) { String msg = new String(message.getBody()); log.info("当前时间:{}, 收到死信队列的消息:{}", new Date().toString(), message); } }
浏览器发送消息
消费者分别过了10s和40s接收到了消息
三、延迟队列的优化
不同的延迟时间需要设置不同的 TTL ,可以优化声明一个通用的 QC 队列,具体的延迟时间有生产者决定
在配置类 TtlQueueConfig 中配置通用队列 QC
//通用队列名称 public static final String Generic_QUEUE_C = "QC"; //声明通用队列 @Bean("QC") public Queue qC() { Map<String, Object> arguments = new HashMap<>(); //设置死信交换机 arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE); //设置死信RoutingKey arguments.put("x-dead-letter-routing-key", "YD"); //因为是通用队列,所以不设置TTL,由生产者指定消息的TTL return QueueBuilder.durable(Generic_QUEUE_C).withArguments(arguments).build(); } //绑定通用队列和普通交换机 @Bean public Binding queueCBindingX(@Qualifier("QC") Queue QC, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(QC).to(xExchange).with("XC"); } //绑定通用队列和死信交换机 @Bean public Binding queueCBindingY(@Qualifier("QC") Queue QC, @Qualifier("yExchange") DirectExchange yExchange) { return BindingBuilder.bind(QC).to(yExchange).with("YD"); }
生产者发送消息,并指定 TTL 时长
//发送消息,并指定消息的TTL @GetMapping("/ttl/sendExpirationMsg/{message}/{ttlTime}") public void sendExpirationMsg(@PathVariable("message") String message, @PathVariable("ttlTime") String ttlTime) { log.info("当前时间:{}, 发送一条TTL为{}ms的消息给队列QC:{}", new Date().toString(), ttlTime, message); rabbitTemplate.convertAndSend("X", "XC", message, msg -> { //设置消息的TTL时长 msg.getMessageProperties().setExpiration(ttlTime); return msg; }); }
发送两条消息
消费者接收消息
但是,如果连续发送两条消息,如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡“,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。结果会导致第二条消息消费者收到时间有误。
四、基于 RabbitMQ 插件实现延迟队列
如果不能实现在消息粒度上的 TTL ,并使其在设置的 TTL 时间及时死亡,就无法设计成一个通用的延时队列。可以使用基于 RabbitMQ 插件来实现延迟队列,从而解决这个问题。
基于 RabbitMQ 插件实现延迟,是交换机实现延迟,而不再是队列实现延迟
创建一个基于插件的延迟队列配置类 DelayedQueueConfig
@Configuration public class DelayedQueueConfig { //交换机名称 public static final String DELAYED_EXCHANGE_NAME = "delayed_exchange"; //队列名称 public static final String DELAYED_QUEUE_NAME = "delayed_queue"; //routingKey public static final String DELAYED_ROUTING_KEY = "delayed_routingKey"; //声明交换机 @Bean public CustomExchange delayedExchange() { Map<String, Object> arguments = new HashMap<>(); arguments.put("x-delayed-type", "direct"); //设置延迟类型 return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, arguments); } //声明队列 @Bean public Queue delayedQueue() { return new Queue(DELAYED_QUEUE_NAME); } //绑定队列和交换机 @Bean public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue, @Qualifier("delayedExchange") CustomExchange delayedExchange) { return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs(); } }
创建生产者发送延迟消息
//基于插件发送消息 @GetMapping("/ttl/sendDelayedMsg/{message}/{delayedTime}") public void sendDelayedMsg(@PathVariable("message") String message, @PathVariable("delayedTime") Integer delayedTime) { log.info("当前时间:{}, 发送一条时长为{}ms的消息给延迟队列delayed_queue:{}", new Date().toString(), delayedTime, message); rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME, DelayedQueueConfig.DELAYED_ROUTING_KEY, message, msg -> { //设置消息的延迟时长 msg.getMessageProperties().setDelay(delayedTime); return msg; }); }
创建消费者
@Slf4j @Component public class DelayedQueueConsumer { //监听消息 @RabbitListener(queues = {DelayedQueueConfig.DELAYED_QUEUE_NAME}) public void receiveDelayQueue(Message message){ String msg = new String(message.getBody()); log.info("当前时间:{}, 收到延迟队列的消息:{}", new Date().toString(), msg); } }
当连续发送两条不同延迟时长的消息时,消费者会先接收到延迟时长短的那条消息,再接收延迟时长长的那条消息。
实现延迟队列,一种是基于死信队列的方式,一种是基于RabbitMQ插件的方式。
延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用RabbitMQ 的特性,如消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。
到此这篇关于RabbitMQ中的延迟队列机制详解的文章就介绍到这了,更多相关RabbitMQ延迟队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!