SpringBoot整合RabbitMQ实现延迟队列和死信队列
作者:哈弟撩编程
一、死信队列
RabbitMQ的死信队列(Dead Letter Queue,DLQ)是一种特殊的队列,用于接收其他队列中的“死信”消息。所谓“死信”,是指满足一定条件而无法被消费者正确处理的消息,这些条件包括消息被拒绝、消息过期、消息达到最大重试次数等。
当消息成为死信时,RabbitMQ会将其重新发送到指定的死信队列,而不是丢弃它们。这样做的好处是可以对死信进行分析和处理,例如记录日志、重新入队或者进一步处理。
死信队列通常与RabbitMQ的延迟队列(Delayed Message Queue)一起使用,通过延迟队列延迟消息的处理时间,可以更容易地触发消息成为死信的条件,从而进行测试和调试。
死信队列在消息中间件中有许多实际应用场景,主要用于处理无法被正常消费的消息,增强了消息的可靠性和处理能力。以下是一些常见的应用场景:
延迟消息处理:通过将消息发送到延迟队列,在指定的时间后再将消息发送到目标队列,实现延迟处理消息的功能。
消息重试:当消费者无法处理消息时,消息可以被重新发送到队列并设置重试次数,达到最大重试次数后转发到死信队列,以便进行进一步处理。
异常处理:当消息无法被消费者正常处理时(如格式错误、业务异常等),将消息转发到死信队列,用于记录日志、报警或人工处理。
消息超时处理:当消息在队列中等待时间过长时,可以设置消息的过期时间(TTL),超过时间后将消息转发到死信队列。
消息路由失败:当消息无法被正确路由到目标队列时,可以将消息发送到死信队列,避免消息丢失。
消息版本兼容性处理:当消息的格式或内容发生变化时,通过死信队列可以处理老版本消息,确保新版本系统的兼容性。
RabbitMQ的工作模式
死信队列的工作模式
今天我要实现的就是这个延迟队列和死信队列。生产者首先向延迟队列发送消息,待达到TTL后消息会被转送到死信队列当中,消费者会从死信队列中获取消息进行消费。
二、RabbitMQ相关的安装
win10安装rabbitMQ的详细步骤_java_脚本之家 (jb51.net)
我这里直接引用别人的文章了,下载需要大家去看一看。
RabbitMQ延迟插件的安装。
三、SpringBoot引入RabbitMQ
1.引入依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </dependency>
2.创建队列和交换器
这一步是很重要的,如果你配置错误了,消息很可能无法正确的传送。要实现延迟队列和死信队列,我们一共要创建以下几个组件:
- 延迟队列
- 延迟队列的交换器
- 死信队列
- 死信队列的交换器
在我们创建了这几个组件之后,我们还要干一些事情,我们需要把这些组件进行组装,如果你不了解RabbitMQ的基础,你可以先看看基础教学,我这里简单的说一下。RabbitMQ中有一种绑定方式,这种绑定方式会把BindingKey和RoutingKey完全匹配的进行绑定,如下图所示,生产者发送了一个BindingKey为“warning”的消息,那么这个消息就会被发送到Queue1和Queue2,这并不难理解。
我们要做的就是把队列和交换器通过一个RoutingKey绑定在一起。
2.1 变量声明
接下来的代码要好好看了,首先我们把我们后边要用到的名称变量全部定义出来。因为这个名称起的很长,我们不方便直接使用。创建DeadRabbitConfig。在类中定义如下变量,延迟队列交换器名称、延迟队列名称、延迟队列Routing名称。除此之外还有死信队列交换器名称、死信队列名称和死信Routing名称。
// 延迟队列交换器名称 public static final String DELAY_EXCHANGE_NAME = "delay.queue.demo.business.exchange"; // 延迟队列A名称 public static final String DELAY_QUEUE_A_NAME = "delay.queue.demo.business.queue_a"; // 延迟队列B名称 public static final String DELAY_QUEUE_B_NAME = "delay.queue.demo.business.queue_b"; // 延迟队列routingA名称 public static final String DELAY_QUEUE_ROUTING_A_NAME = "delay.queue.demo.business.queue_a.routing_key"; // 延迟队列routingB名称 public static final String DELAY_QUEUE_ROUTING_B_NAME = "delay.queue.demo.business.queue_b.routing_key"; // 死信队列 public static final String DEAD_LETTER_EXCHANGE = "delay.queue.demo.deadletter.exchange"; public static final String DEAD_LETTER_QUEUE_A_ROUTING_KEY = "delay.queue.demo.deadletter.delay_10s.routing_key"; public static final String DEAD_LETTER_QUEUE_B_ROUTING_KEY = "delay.queue.demo.deadletter.delay_60s.routing_key"; public static final String DEAD_LETTER_QUEUE_A_NAME = "delay.queue.demo.deadletter.queue_a"; public static final String DEAD_LETTER_QUEUE_B_NAME = "delay.queue.demo.deadletter.queue_b";
2.2 创建延迟交换器
// 注册延迟交换器delayExchange @Bean("delayExchange") public DirectExchange delayExchange(){ return new DirectExchange(DELAY_EXCHANGE_NAME); }
2.3 创建延迟队列
这里的延迟队列需要我们额外的配置一些参数,用于和死信队列进行信息发送。这里我是用了两种不同的方式构建延迟队列A和延迟队列B,在延迟队列A种我没有设置TTL参数,而是通过RabbitMQ的延迟插件实现的,而延迟队列B我设置了TTL为10000ms,也就是十秒,十秒内消息如果没有被消费掉就会发送到死信队列。
// 注册延迟队列A 还要绑定死信交换器和死信routingA @Bean("delayQueueA") public Queue delayQueueA(){ Map<String,Object> args = new HashMap<>(); args.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE); args.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUE_A_ROUTING_KEY); //args.put("x-message-ttl",6000); return QueueBuilder.durable(DELAY_QUEUE_A_NAME).withArguments(args).build(); } // 注册延迟队列B 还要绑定死信交换器和死信routingB @Bean("delayQueueB") public Queue delayQueueB(){ Map<String,Object> args = new HashMap<>(); args.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE); args.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUE_B_ROUTING_KEY); args.put("x-message-ttl",10000); return QueueBuilder.durable(DELAY_QUEUE_B_NAME).withArguments(args).build(); }
2.4 延迟队列绑定延迟交换器
// 延迟队列A绑定交换器 @Bean public Binding delayQueueABinding(@Qualifier("delayQueueA") Queue queue, @Qualifier("delayExchange") DirectExchange delayExchange){ return BindingBuilder.bind(queue).to(delayExchange).with(DELAY_QUEUE_ROUTING_A_NAME); } // 延迟队列B绑定交换器 @Bean public Binding delayQueueBBinding(@Qualifier("delayQueueB") Queue queue,@Qualifier("delayExchange") DirectExchange delayExchange){ return BindingBuilder.bind(queue).to(delayExchange).with(DELAY_QUEUE_ROUTING_B_NAME); }
2.5 死信队列配置
与延迟队列不同的是,死信队列并没有配置延迟参数。
// 注册死信队列A @Bean("deadLetterQueueA") public Queue deadLetterQueueA(){ return new Queue(DEAD_LETTER_QUEUE_A_NAME); } // 注册死信队列B @Bean("deadLetterQueueB") public Queue deadLetterQueueB(){ return new Queue(DEAD_LETTER_QUEUE_B_NAME); } // 注册死信交换器 @Bean public DirectExchange deadLetterExchange(){ return new DirectExchange(DEAD_LETTER_EXCHANGE); } // 死信队列A绑定死信交换器 @Bean public Binding deadLetterQueueABinding(@Qualifier("deadLetterQueueA") Queue queue, @Qualifier("deadLetterExchange") DirectExchange deadLetterExchange){ return BindingBuilder.bind(queue).to(deadLetterExchange).with(DEAD_LETTER_QUEUE_A_ROUTING_KEY); } // 死信队列B绑定死信交换器 @Bean public Binding deadLetterQueueBBinding(@Qualifier("deadLetterQueueB") Queue queue, @Qualifier("deadLetterExchange")DirectExchange deadLetterExchange){ return BindingBuilder.bind(queue).to(deadLetterExchange).with(DEAD_LETTER_QUEUE_B_ROUTING_KEY); }
到此为止,RabbitMQ的组件配置完成。
3. 添加application.yml
server: port: 8081 spring: application: name: test-rabbitmq-producer rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest
4. 添加RabbitMQListener (消费者)
下方的代码一共有两个消费者,一个消费者获取死信队列A中的消息,另一个消费者获取死信队列B中的消息。
@Component public class DeadLetterQueueConsumer { public static final Logger LOGGER = LoggerFactory.getLogger(DeadLetterQueueConsumer.class); @RabbitListener(queues = DeadRabbitConfig.DEAD_LETTER_QUEUE_A_NAME,ackMode = "MANUAL") public void receiveA(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); LOGGER.info("当前时间:{},死信队列A收到消息:{}", new Date().toString(), msg); System.out.println(message.getMessageProperties().getDeliveryTag()); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } @RabbitListener(queues = DeadRabbitConfig.DEAD_LETTER_QUEUE_B_NAME,ackMode = "MANUAL") public void receiveB(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); LOGGER.info("当前时间:{},死信队列B收到消息:{}", new Date().toString(), msg); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
5. 创建DelayMessageSender
这里采用的就是两种不同的方式,一种方式是使用插件来延迟消息的发送,另一种是通过TTL参数。
@Component public class DelayMessageSender { @Resource RabbitTemplate rabbitTemplate; public void sendMessage(String msg,Integer delayTimes){ switch (delayTimes){ case 6: rabbitTemplate.convertAndSend(DeadRabbitConfig.DELAY_EXCHANGE_NAME, DeadRabbitConfig.DELAY_QUEUE_ROUTING_A_NAME,msg,new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration(String.valueOf(6000)); return message; } }); break; case 10: rabbitTemplate.convertAndSend(DeadRabbitConfig.DELAY_QUEUE_B_NAME,msg); break; } } }
6. 创建Controller
@RestController @RequestMapping("/student") public class StudentController { @Autowired DelayMessageSender messageSender; @RequestMapping("/send-message") public String sendMessage(String msg,Integer delayTimes){ System.out.println(new Date()); messageSender.sendMessage(msg,delayTimes); return "发送成功"; } }
7.测试
在浏览器中输入以下地址进入RabbitMQ界面。账号密码都是guest。
http://localhost:15672/
先来看看我们的初始队列。这里是什么都没有的。
然后我们启动项目后在看。我们刚才创建出来的四个队列全部都被加载了出来。
使用PostMan发送一次请求。
我们的请求在17s的时候发送到后端,消息打印在23s,说明我们的延迟队列有效果。
接下来我们测试10s的延迟队列。
10s后死信队列B成功的接收到了消息。
四、死信队列的应用场景
延迟队列通常用于需要延迟执行某些任务或触发某些事件的场景。例如,在电子商务中,可以使用延迟队列实现订单超时未支付自动取消功能。
1.订单创建:
用户下单后,系统生成订单,并将订单信息发送到一个普通队列,同时设置一个TTL(Time-To-Live)为30分钟。这个队列配置了死信交换机(Dead Letter Exchange, DLX),当消息过期后会被转发到死信队列。2.等待支付:
在30分钟内,用户可以完成支付。如果用户在30分钟内支付完成,系统会从普通队列中移除对应的消息并正常处理订单。3.订单超时处理:
如果用户未在30分钟内完成支付,消息会自动过期并转发到死信交换机,进而转发到死信队列。4.取消订单:
系统有一个专门的消费者监听死信队列。当有消息进入死信队列时,消费者会自动处理这些消息,即取消订单、释放库存,并通知用户订单已取消。5.定时任务(可选):
虽然死信队列已经提供了超时订单的处理,但为了防止消息丢失或处理延迟,可以设置一个定时任务定期检查订单状态,确保所有超时未支付的订单都得到了处理。
以上就是SpringBoot整合RabbitMQ实现延迟队列和死信队列的详细内容,更多关于SpringBoot RabbitMQ队列的资料请关注脚本之家其它相关文章!