SpringBoot使用RabbitMQ延时队列(小白必备)
作者:一元情书 释
1.什么是MQ
MQ,是一种跨进程的通信机制,用于上下游传递消息。
在互联网架构中,MQ是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。
使用了MQ之后,消息发送上游只需要依赖MQ,不用依赖其他服务。
为什么会产生消息列队?
- 不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个;
- 不同进程(process)之间传递消息时,为了实现标准化,将消息的格式规范化了,并且,某一个进程接受的消息太多,一下子无法处理完,并且也有先后顺序,必须对收到的消息进行排队,因此诞生了事实上的消息队列;
延时列队的使用场景?
- 订单业务:在淘宝或者京东购买东西,用户下单后未付款则30分钟后取消订单。
- 短信通知:手机用户交完话费后,几分钟之内将会收到缴费信息
2.什么是RabbitMQ(这里就做了一下简单介绍)
RabbitMQ是一种消息队列 ,用于常见的进程通信。支持点对点,请求应答和发布订阅模式 并且提供多种语言的支持。常见的java,c#,php都支持。
常被用在异步处理,应用解耦。流量消锋等复杂的业务场景中。和java的kafka一样都属于消息中间件。
下载地址:
https://www.rabbitmq.com/download.html
进入RabbitMQ官网
1.第一步
第二步
下载好后不要着急安装RabbitMQ,我们这里还需要安装Erlang
下载地址:http://www.erlang.org/download/otp_win64_17.3.exe
安装步骤
步骤一
步骤二
步骤三
步骤四
安装完成
现在安装RabbitMQ
步骤一
步骤二
步骤三
安装完成
启动RabbitMQ管理工具
开始菜单 — 最新添加 — 展开 — 选中双击
输入命令:rabbitmq-plugins enable rabbitmq_management
效果如果图
在浏览器中输入地址查看:http://127.0.0.1:15672/
出现次页面代表成功,默认用户和密码都是guest/ guest
若不出现此页面,就是安装失败了,不要慌,多半问题在系统用户名必须是中文(放心有解决办法):
Windows下安装RabbitMQ后,按正常RabbitMQ会自动注册服务并自动启动,但是如果有的道友不注意中英文目录就会出现服务启动后几秒钟自动停止,而且反反复复。
出现这种情况一般都是由我们的用户名是中文,而导致默认的DB和log访问出现问。所以我建议以后大家在使用windows操作系统的时候尽量用英文来命名文件或目录,这样会极大的减小以后安装软件出现莫名其妙的问题的bug。
接下来我们先卸载我们的RabbitMQ,然后在我们的系统变量里设置一个RABBITMQ_BASE 的变量路径为一个不含英文的路径 比如 E:\rabbit,最后我们重新安装RabbitMQ即可,然后就会看到RabbitMQ服务自动注册了,并且不会自动停止。
SpringBoot整合RabbitMQ
1.添加依赖
pom.xml中添加 spring-boot-starter-amqp的依赖
<!-- spring-boot-starter-amqp的依赖 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
其他依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency>
application.yml文件中配置rabbitmq相关内容
spring: rabbitmq: host: localhost port: 5672 username: guest password: guest
这里我们环境就搭建起来了
2.具体编码实现
配置列队
package com.example.spring_boot_rabbitmq; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * @author:zq * @date: Greated in 2019/12/19 11:46 * 配置队列 */ @Configuration @Slf4j public class DelayRabbitConfig { /** * 延迟队列 TTL 名称 */ private static final String ORDER_DELAY_QUEUE = "user.order.delay.queue"; /** * DLX,dead letter发送到的 exchange * 延时消息就是发送到该交换机的 */ public static final String ORDER_DELAY_EXCHANGE = "user.order.delay.exchange"; /** * routing key 名称 * 具体消息发送在该 routingKey 的 */ public static final String ORDER_DELAY_ROUTING_KEY = "order_delay"; public static final String ORDER_QUEUE_NAME = "user.order.queue"; public static final String ORDER_EXCHANGE_NAME = "user.order.exchange"; public static final String ORDER_ROUTING_KEY = "order"; /** * 延迟队列配置 * <p> * 1、params.put("x-message-ttl", 5 * 1000); * 第一种方式是直接设置 Queue 延迟时间 但如果直接给队列设置过期时间,这种做法不是很灵活,(当然二者是兼容的,默认是时间小的优先) * 2、rabbitTemplate.convertAndSend(book, message -> { * message.getMessageProperties().setExpiration(2 * 1000 + ""); * return message; * }); * 第二种就是每次发送消息动态设置延迟时间,这样我们可以灵活控制 **/ @Bean public Queue delayOrderQueue() { Map<String, Object> params = new HashMap<>(); // x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称, params.put("x-dead-letter-exchange", ORDER_EXCHANGE_NAME); // x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。 params.put("x-dead-letter-routing-key", ORDER_ROUTING_KEY); return new Queue(ORDER_DELAY_QUEUE, true, false, false, params); } /** * 需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。 * 这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发, * 不会转发dog.puppy,也不会转发dog.guard,只会转发dog。 * @return DirectExchange */ @Bean public DirectExchange orderDelayExchange() { return new DirectExchange(ORDER_DELAY_EXCHANGE); } @Bean public Binding dlxBinding() { return BindingBuilder.bind(delayOrderQueue()).to(orderDelayExchange()).with(ORDER_DELAY_ROUTING_KEY); } @Bean public Queue orderQueue() { return new Queue(ORDER_QUEUE_NAME, true); } /** * 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。 * 符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”。 **/ @Bean public TopicExchange orderTopicExchange() { return new TopicExchange(ORDER_EXCHANGE_NAME); } @Bean public Binding orderBinding() { // TODO 如果要让延迟队列之间有关联,这里的 routingKey 和 绑定的交换机很关键 return BindingBuilder.bind(orderQueue()).to(orderTopicExchange()).with(ORDER_ROUTING_KEY); } }
创建一个Order实体类
package com.example.spring_boot_rabbitmq.pojo; import lombok.Data; import java.io.Serializable; /** * @author:zq * @date: Greated in 2019/12/19 11:49 */ @Data public class Order implements Serializable { private static final long serialVersionUID = -2221214252163879885L; private String orderId; // 订单id private Integer orderStatus; // 订单状态 0:未支付,1:已支付,2:订单已取消 private String orderName; // 订单名字 }
接收者
package com.example.spring_boot_rabbitmq; import com.example.spring_boot_rabbitmq.pojo.Order; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Date; /** * @author:zq * @date: Greated in 2019/12/19 11:53 * 接收者 */ @Component @Slf4j public class DelayReceiver { @RabbitListener(queues = {DelayRabbitConfig.ORDER_QUEUE_NAME}) public void orderDelayQueue(Order order, Message message, Channel channel) { log.info("###########################################"); log.info("【orderDelayQueue 监听的消息】 - 【消费时间】 - [{}]- 【订单内容】 - [{}]", new Date(), order.toString()); if(order.getOrderStatus() == 0) { order.setOrderStatus(2); log.info("【该订单未支付,取消订单】" + order.toString()); } else if(order.getOrderStatus() == 1) { log.info("【该订单已完成支付】"); } else if(order.getOrderStatus() == 2) { log.info("【该订单已取消】"); } log.info("###########################################"); } }
发送者
package com.example.spring_boot_rabbitmq; import com.example.spring_boot_rabbitmq.pojo.Order; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; /** * @author:zq * @date: Greated in 2019/12/19 11:55 * 发送者 */ @Component @Slf4j public class DelaySender { @Autowired private AmqpTemplate amqpTemplate; public void sendDelay(Order order) { log.info("【订单生成时间】" + new Date().toString() +"【1分钟后检查订单是否已经支付】" + order.toString() ); this.amqpTemplate.convertAndSend(DelayRabbitConfig.ORDER_DELAY_EXCHANGE, DelayRabbitConfig.ORDER_DELAY_ROUTING_KEY, order, message -> { // 如果配置了 params.put("x-message-ttl", 5 * 1000); 那么这一句也可以省略,具体根据业务需要是声明 Queue 的时候就指定好延迟时间还是在发送自己控制时间 message.getMessageProperties().setExpiration(1 * 1000 * 60 + ""); return message; }); } }
测试,访问http://localhost:8080/sendDelay查看日志输出
package com.example.spring_boot_rabbitmq; import com.example.spring_boot_rabbitmq.pojo.Order; import org.springframework.web.bind.annotation.RestController; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; /** * @author:zq * @date: Greated in 2019/12/19 11:57 * 测试 */ @RestController public class TestController { @Autowired private DelaySender delaySender; @GetMapping("/sendDelay") public Object sendDelay() { Order order1 = new Order(); order1.setOrderStatus(0); order1.setOrderId("123456"); order1.setOrderName("小米6"); Order order2 = new Order(); order2.setOrderStatus(1); order2.setOrderId("456789"); order2.setOrderName("小米8"); delaySender.sendDelay(order1); delaySender.sendDelay(order2); return "ok"; } }
输出
到此已经SpringBoot使用RabbitMQ延时队列已经完成,希望对你有所帮助,若有地方不理解或者有更好的办法请留言,谢谢。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。