如何利用rabbitMq的死信队列实现延时消息
作者:好大的月亮
这篇文章主要介绍了如何利用rabbitMq的死信队列实现延时消息问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
前言
使用mq
自带的死信去实现延时消息要注意一个坑点,就是mq
只会检测队首的消息的过期时间,假设先放入队列10s
过期消息,再放入2s
过期。
mq
会检测头部10s
是否过期,10s
不过期的情况下,2s
就算过去也不会跑到死信.
mq基本的消息模型
mq死信队列的消息模型
简单的说就是先弄一个正常队列,然后不要设置消费者,接着给这个正常队列绑定一个死信队列,这个死信队列设置方式和正常队列没啥区别。
然后监听这个死信队列的消费.
一般死信队列由三大核心组件组成:死信交换机+死信路由+TTL(消息过期时间)
通常死信队列由“面向生产者的基本交换机+基本路由”绑定而成,所以生产者首先是将消息发送至“基本交换机+基本路由”所绑定而成的消息模型中,即间接性地进入到死信队列中,当过了TTL,消息将“挂掉”,从而进入下一个中转站,即“面下那个消费者的死信交换机+死信路由”所绑定而成的消息模型中.
maven依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
配置普通队列和死信队列
yml文件
spring: application: name: ttl-queue rabbitmq: host: 110.40.181.73 port: 35672 username: root password: 10086 virtual-host: /fchan connection-timeout: 15000 # 发送确认 #publisher-confirms: true # 路由失败回调 publisher-returns: true template: # 必须设置成true 消息路由失败通知监听者,而不是将消息丢弃 mandatory: true listener: simple: # 每次从RabbitMQ获取的消息数量 prefetch: 1 default-requeue-rejected: false # 每个队列启动的消费者数量 concurrency: 1 # 每个队列最大的消费者数量 max-concurrency: 1 # 签收模式为手动签收-那么需要在代码中手动ACK acknowledge-mode: manual
package com.fchan.mq.mqDelay.dlx; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class MyRabConfig { //定义普通任务队列(其实就是一个普通队列,只是没有消费者) @Bean("delayQue") public Queue delayQue(){ //普通队列绑定死信交换机及路由key //delayQueue延时队列 return QueueBuilder.durable("delayQueue") //指定这个延时队列下的死信交换机 //如果消息过时则会被投递到当前对应的my-dlx-exchange .withArgument("x-dead-letter-exchange", "my-dlx-exchange") //死信交换机绑定的路由key //如果消息过时,my-dlx-exchange会根据routing-key-delay投递消息到对应的队列 //mq发送消息的时候一般指定的是exchange交换机+这个routingKey来找到队列,这个应该是出于有时候需要让mq的交换机将一个消息发送到多个队列所采用的方式 .withArgument("x-dead-letter-routing-key", "routing-key-delay") .build(); } //定义普通队列的交换机 @Bean("delayExchange") public Exchange delayExchange(){ return ExchangeBuilder.directExchange("delayExchange").build(); } //绑定普通队列的交换机 @Bean("delayBinding") public Binding delayBinding(@Qualifier("delayExchange") Exchange exchange, @Qualifier("delayQue") Queue queue){ return BindingBuilder.bind(queue).to(exchange).with("delayBindingRoutingKey").noargs(); } //定义死信队列 @Bean("dlxQueue") public Queue dlxQueue(){ return QueueBuilder.durable("my-dlx-queue").build(); } //定义死信交换机,这里的死信交换机要和上面普通队列所绑定的交换机名称一致 @Bean("dlxExchange") public Exchange dlxExchange(){ return ExchangeBuilder.directExchange("my-dlx-exchange").build(); } //绑定交换机与死信队列 @Bean("dlxBinding") public Binding dlxBinding(@Qualifier("dlxExchange") Exchange exchange, @Qualifier("dlxQueue") Queue queue){ //这儿这个routingKey要和上面正常队列中所绑定的死信routingKey一致 return BindingBuilder.bind(queue).to(exchange).with("routing-key-delay").noargs(); } }
死信队列消费者
package com.fchan.mq.mqDelay; import com.rabbitmq.client.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Map; @Component public class MyRabbitConsume { Logger log = LoggerFactory.getLogger(MyRabbitConsume.class); @RabbitListener(queues = {"my-dlx-queue"}) public void receiveDeadMessage(Map<String,Object> map, Message message, Channel channel) throws Exception { log.info("收到死信信息:{}",map); log.info("然后进行一系列逻辑处理 Thanks♪(・ω・)ノ"); //手动确认消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
发送消息测试
/** * 发送消息到延时队列 delayQueue * 消息5秒后会被投递到死信队列 * @return */ @GetMapping("sendMsgToDelay") public String sendMsgToDelay(String msg){ Map<String,Object> map = new HashMap<>(); map.put("msg", msg); rabbitTemplate.convertAndSend("delayExchange","delayBindingRoutingKey", map, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //设置消息过期时间,5秒过期 message.getMessageProperties().setExpiration("5000"); return message; } }); return msg; }
测试成功
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。