Springboot使用Rabbitmq的延时队列+死信队列实现消息延期消费
作者:WalkerShen
本文介绍了RabbitMQ的延时队列和死信队列,解释了它们的工作原理及其应用场景,延时队列允许消息在设定的时间后被消费,结合实际案例,展示了如何实现和使用延时队列和死信队列,感兴趣的朋友一起看看吧
简介
RabbitMQ 的延时队列(Delayed Queue)是指消息在发送到队列后,会在一定的时间内被延迟处理,直到预设的延迟时间结束,消息才会被消费者消费。RabbitMQ 本身并不原生支持延时队列,但我们可以通过一些插件或特定配置来实现这种功能。
延时队列的应用场景
- 定时任务:比如定时发送邮件、推送通知等操作。
- 重试机制:当某些任务失败时,可以让任务在一段时间后再次尝试。
- 延迟消费:在特定时间后消费消息,而不是立即消费。
死信队列(Dead Letter Queue,简称 DLQ)是消息队列中的一个特殊队列,用于存放那些因某些原因无法被正常消费的消息。死信队列在许多消息队列系统(如 RabbitMQ、Kafka 等)中都有应用。通过将无法消费的消息转移到死信队列中,可以帮助开发者发现和处理消息消费过程中的异常或错误。
死信队列的主要用途是:
- 错误处理:用于捕获无法消费的消息,避免这些消息丢失,并提供后续人工干预或自动重试的机会。
- 监控和诊断:通过查看死信队列,可以帮助开发者快速发现问题,例如队列被溢出、消息格式不正确等。
- 防止数据丢失:将那些因为超时、队列溢出、消费失败等原因无法处理的消息放入死信队列,可以保证它们不会丢失,后续可以进一步调查和处理。
本文结合延时队列和死信队列,延时队列的消息设置过期时间,在时间过期之后,将消息内容推送到死信队列中进行处理。
整合逻辑
依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.30</version> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.8.27</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.7.7</version> </dependency>
配置
server: port: 8081 spring: application: name: walker-rabbitmq rabbitmq: host: 127.0.0.1 port: 5672 username: guest_walker password: guest
创建交换机|队列
延时队列
package com.walker.rabbitmq.delay.config; import com.walker.rabbitmq.delay.constants.BaseConstant; import com.walker.rabbitmq.delay.enums.RabbitMqConfigEnum; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class OrderDelayRabbitConfig { // 交换机 @Bean public DirectExchange orderDelayExchange(){ return new DirectExchange(RabbitMqConfigEnum.ORDER_DELAY.getExchange()); } // 队列 @Bean public Queue orderDelayQueue(){ Map<String,Object> args = new HashMap<>(); // 延时队列,需要绑定死信队列,在ttl到期之后,将队列转到死信队列 args.put(BaseConstant.xDeadLetterExchange,RabbitMqConfigEnum.ORDER_DEAD.getExchange()); args.put(BaseConstant.xDeadLetterRoutingKey,RabbitMqConfigEnum.ORDER_DEAD.getRoutingKey()); return QueueBuilder.durable(RabbitMqConfigEnum.ORDER_DELAY.getQueue()).autoDelete().withArguments(args).build(); } // @Bean public Binding orderDelayQueueBinding(){ return BindingBuilder.bind(orderDelayQueue()).to(orderDelayExchange()).with(RabbitMqConfigEnum.ORDER_DELAY.getRoutingKey()); } }
死信队列
package com.walker.rabbitmq.delay.config; import com.walker.rabbitmq.delay.enums.RabbitMqConfigEnum; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class OrderDeadRabbitConfig { // 交换机 @Bean public DirectExchange orderDeadExchange(){ return new DirectExchange(RabbitMqConfigEnum.ORDER_DEAD.getExchange()); } // 队列 @Bean public Queue orderDeadQueue(){ return QueueBuilder.durable(RabbitMqConfigEnum.ORDER_DEAD.getQueue()).autoDelete().build(); } // 交换机、队列、路由键绑定 @Bean public Binding orderDeadQueueBinding(){ return BindingBuilder.bind(orderDeadQueue()).to(orderDeadExchange()) .with(RabbitMqConfigEnum.ORDER_DEAD.getRoutingKey()); } }
常量
package com.walker.rabbitmq.delay.constants; import com.walker.rabbitmq.delay.enums.RabbitMqConfigEnum; public interface BaseConstant { String xDeadLetterExchange = "x-dead-letter-exchange"; String xDeadLetterRoutingKey = "x-dead-letter-routing-key"; }
相关枚举
package com.walker.rabbitmq.delay.enums; import lombok.AllArgsConstructor; import lombok.Getter; @AllArgsConstructor @Getter public enum RabbitMqConfigEnum { ORDER_DELAY("direct","订单延时队列","order_delay_exchange", "order_delay_queue", "order_delay_routing_key",true), ORDER_DEAD("direct","订单死信队列","order_dead_exchange", "order_dead_queue", "order_dead_routing_key",true); private final String type; private final String title; private final String exchange; private final String queue; private final String routingKey; private final Boolean durable; }
rabbitmq工具类封装
package com.walker.rabbitmq.delay.service; import cn.hutool.core.util.BooleanUtil; import cn.hutool.json.JSONUtil; import com.rabbitmq.client.Channel; import com.walker.rabbitmq.delay.enums.RabbitMqConfigEnum; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.io.IOException; import java.util.function.Consumer; import java.util.function.Function; @Component @Slf4j public class RabbitmqService { @Resource private RabbitTemplate rabbitTemplate; /** * author:walker * time: 2024/12/30 * description: 无返回结果的手动处理方式 */ public void doWithManualAck(String event, Message message, Channel channel, Consumer<String> method) { String data = new String(message.getBody()); try { method.accept(data); sendAck(message, channel); } catch (Exception e) { log.error("处理事件[{}]异常数据为:{},原因如下:",event,data, e); sendNack(message, channel); } } /** * author:walker * time: 2024/12/30 * description: 根据返回的布尔类型,进行确认和拒绝 */ public void doWithManualAck(String event, Message message, Channel channel, Function<String,Boolean> method) { String data = new String(message.getBody()); try { Boolean res = method.apply(data); if(BooleanUtil.isTrue(res)){ sendAck(message, channel); }else{ sendNack(message, channel); } } catch (Exception e) { log.error("处理事件[{}]异常数据为:{},原因如下:",event,data, e); sendNack(message, channel); } } // 确认消息 public void sendAck(Message message, Channel channel) throws IOException { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } // 拒绝消息 public void sendNack(Message message, Channel channel) { try { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } catch (IOException e) { log.error("消息消费失败,消息ID:{}", message.getMessageProperties().getMessageId(), e); } } public <T> void sendDelayMsg(RabbitMqConfigEnum configEnum, T data, Integer delayTime) { rabbitTemplate.convertAndSend(configEnum.getExchange(), configEnum.getRoutingKey(), JSONUtil.toJsonStr(data), new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration(String.valueOf(delayTime)); return message; } }); } }
案例
实体类
package com.walker.rabbitmq.delay.entity; import lombok.Data; @Data public class OrderInfo { private String productName; private Integer num; private String userName; private String orderTime; }
controller 创建订单
package com.walker.rabbitmq.delay.controller; import com.walker.rabbitmq.delay.enums.RabbitMqConfigEnum; import com.walker.rabbitmq.delay.entity.OrderInfo; import com.walker.rabbitmq.delay.service.RabbitmqService; import lombok.extern.slf4j.Slf4j; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.util.Date; @Slf4j @RestController @RequestMapping("/order") public class OrderController { @Resource private RabbitmqService rabbitmqService; /** * author:walker * time: 2025/1/2 * description: 下单 */ @GetMapping("/create") public void createOrder() { OrderInfo orderInfo = new OrderInfo(); orderInfo.setProductName("羽毛球"); orderInfo.setUserName("张三"); orderInfo.setOrderTime("2025-01-02 12:00:00"); orderInfo.setNum(1); log.info("发送消息时间{}",new Date()); rabbitmqService.sendDelayMsg(RabbitMqConfigEnum.ORDER_DELAY,orderInfo,10000); } }
消费者监听消息
消费者接口
package com.walker.rabbitmq.delay.service; public interface IRabbitCosumerHandler { Boolean handle(String message); }
消费者监听方法
package com.walker.rabbitmq.delay.listener; import com.rabbitmq.client.Channel; import com.walker.rabbitmq.delay.enums.RabbitMqConfigEnum; import com.walker.rabbitmq.delay.constants.BaseConstant; import com.walker.rabbitmq.delay.service.IRabbitCosumerHandler; import com.walker.rabbitmq.delay.service.RabbitmqService; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.io.IOException; import java.util.Date; @Slf4j @Component public class OrderDeadLetterQueueConsumer implements IRabbitCosumerHandler { @Resource private RabbitmqService rabbitmqService; @RabbitListener(queues = "#{T(com.walker.rabbitmq.delay.enums.RabbitMqConfigEnum).ORDER_DEAD.getQueue()}",ackMode = "MANUAL") public void receive(Message message, Channel channel) throws IOException { log.info("接受消息时间 {}",new Date()); String msg = new String(message.getBody()); log.info("当前时间:{},队列{}收到消息:{}", new Date().toString(),RabbitMqConfigEnum.ORDER_DEAD.getQueue(), msg); rabbitmqService.doWithManualAck(RabbitMqConfigEnum.ORDER_DEAD.getTitle(), message, channel, this::handle); } @Override public Boolean handle(String message) { // 处理逻辑 log.info("进行订单取消的操作"); return true; } }
到此这篇关于Springboot使用Rabbitmq的延时队列+死信队列实现消息延期消费 的文章就介绍到这了,更多相关Springboot使用Rabbitmq消息延期消费 内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!