Java中RabbitMQ延迟队列实现详解
作者:CD4356
这篇文章主要介绍了Java中RabbitMQ延迟队列实现详解,消息过期后,根据routing-key的不同,又会被死信交换机路由到不同的死信队列中,消费者只需要监听对应的死信队列进行消费即可,需要的朋友可以参考下
一、RabbitMQ延迟队列实现
1.1、RabbitMQ延迟队列实现流程
- 生产者生产一条延迟消息,根据延迟时间的不同,利用不同的routing-key将消息路由到不同的延迟队列,每个队列都设置了不同的 TTL 属性 ( TTL ( Time To Live ) 生存时间 ),并绑定到同一个死信交换机中。
- 消息过期后,根据routing-key的不同,又会被死信交换机路由到不同的死信队列中,消费者只需要监听对应的死信队列进行消费即可。
1.2、配置RabbitMQ连接
#[ RabbitMQ相关配置 ] #rabbitmq服务器IP spring.rabbitmq.host=安装RabbitMQ的服务器IP #rabbitmq服务器端口(默认为5672) spring.rabbitmq.port=5672 #用户名 spring.rabbitmq.username=guest #用户密码 spring.rabbitmq.password=guest #虚拟主机(一个RabbitMQ服务可以配置多个虚拟主机,每一个虚拟机主机之间是相互隔离,相互独立的,授权用户到指定的virtual-host就可以发送消息到指定队列) #vhost虚拟主机地址( 默认为/ ) spring.rabbitmq.virtual-host=/
1.3、创建配置类
配置两个交换机、四个队列、以及根据路由键配置交换机和队列的绑定关系
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class RabbitMQConfiguration { //延迟交换机 public static final String DELAY_EXCHANGE = "delay_exchange"; //延迟队列A public static final String DELAY_QUEUE_A = "delay_queue_a"; //延迟队列B public static final String DELAY_QUEUE_B = "delay_queue_b"; //延迟路由键10S public static final String DELAY_QUEUE_10S_ROUTING_KEY = "delay_queue_10s_routing_key"; //延迟路由键60S public static final String DELAY_QUEUE_60S_ROUTING_KEY = "delay_queue_60s_routing_key"; //死信交换机 public static final String DEAD_LETTER_EXCHANGE = "dead_letter_exchange"; //死信队列A public static final String DEAD_LETTER_QUEUE_A = "dead_letter_queue_a"; //死信队列B public static final String DEAD_LETTER_QUEUE_B = "dead_letter_queue_b"; //死信路由键10S public static final String DEAD_LETTER_QUEUE_10S_ROUTING_KEY = "dead_letter_queue_10s_routing_key"; //死信路由键60S public static final String DEAD_LETTER_QUEUE_60S_ROUTING_KEY = "dead_letter_queue_60s_routing_key"; //延迟交换机 @Bean("delayExchange") public DirectExchange delayExchange(){ return new DirectExchange(DELAY_EXCHANGE, true, false); } //延迟队列A @Bean("delayQueueA") public Queue delayQueueA(){ Map<String, Object> args = new HashMap<>(); //设置延迟队列绑定的死信交换机 args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); //设置延迟队列绑定的死信路由键 args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_10S_ROUTING_KEY); //设置延迟队列的 TTL 消息存活时间 args.put("x-message-ttl", 10*1000); return new Queue(DELAY_QUEUE_A, true, false, false, args); } //延迟队列B @Bean("delayQueueB") public Queue delayQueueB(){ Map<String, Object> args = new HashMap<>(); //设置延迟队列绑定的死信交换机 args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); //设置延迟队列绑定的死信路由键 args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_60S_ROUTING_KEY); //设置延迟队列的 TTL 消息存活时间 args.put("x-message-ttl", 60*1000); return new Queue(DELAY_QUEUE_B, true, false, false, args); } //延迟队列A的绑定关系 @Bean("delayBindingA") public Binding delayBindingA(@Qualifier("delayQueueA")Queue queue, @Qualifier("delayExchange")DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_10S_ROUTING_KEY); } //延迟队列B的绑定关系 @Bean("delayBindingB") public Binding delayBindingB(@Qualifier("delayQueueB")Queue queue, @Qualifier("delayExchange")DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_60S_ROUTING_KEY); } //死信交换机 @Bean("deadLetterExchange") public DirectExchange deadLetterExchange(){ return new DirectExchange(DEAD_LETTER_EXCHANGE, true, false); } //死信队列A @Bean("deadLetterQueueA") public Queue deadLetterQueueA(){ return new Queue(DEAD_LETTER_QUEUE_A, true); } //死信队列B @Bean("deadLetterQueueB") public Queue deadLetterQueueB(){ return new Queue(DEAD_LETTER_QUEUE_B, true); } //死信队列A的绑定关系 @Bean("deadLetterBindingA") public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA")Queue queue, @Qualifier("deadLetterExchange")DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUE_10S_ROUTING_KEY); } //死信队列B的绑定关系 @Bean("deadLetterBindingB") public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB")Queue queue, @Qualifier("deadLetterExchange")DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUE_60S_ROUTING_KEY); } }
1.4、创建一个枚举类来配置延迟类型
@Getter @AllArgsConstructor public enum DelayTypeEnum { //10s DELAY_10s(1), //60s DELAY_60s(2); private Integer type; /** * 延迟类型 * @param type * @return 延迟类型 */ public static DelayTypeEnum getDelayTypeEnum(Integer type){ if(Objects.equals(type, DELAY_10s.type)){ return DELAY_10s; } if(Objects.equals(type, DELAY_60s.type)){ return DELAY_60s; } return null; } }
1.5、创建生产者类发送消息
import com.cd.springbootrabbitmq.enums.DelayTypeEnum; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import static com.cd.springbootrabbitmq.config.RabbitMQConfiguration.DELAY_EXCHANGE; import static com.cd.springbootrabbitmq.config.RabbitMQConfiguration.DELAY_QUEUE_10S_ROUTING_KEY; import static com.cd.springbootrabbitmq.config.RabbitMQConfiguration.DELAY_QUEUE_60S_ROUTING_KEY; /** * 延迟消息生产者 */ @Component public class DelayMessageProducer { @Autowired private RabbitTemplate rabbitTemplate; /** * 发送延迟消息 * @param message 要发送的消息 * @param type 延迟类型(延时10s的延迟队列 或 延时60s的延迟队列) */ public void sendDelayMessage(String message, DelayTypeEnum type){ switch (type){ case DELAY_10s: rabbitTemplate.convertAndSend(DELAY_EXCHANGE, DELAY_QUEUE_10S_ROUTING_KEY, message); break; case DELAY_60s: rabbitTemplate.convertAndSend(DELAY_EXCHANGE, DELAY_QUEUE_60S_ROUTING_KEY, message); break; default: break; } } }
1.6、创建消费者类消费消息
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.time.LocalDateTime; import static com.cd.springbootrabbitmq.config.RabbitMQConfiguration.DEAD_LETTER_QUEUE_A; import static com.cd.springbootrabbitmq.config.RabbitMQConfiguration.DEAD_LETTER_QUEUE_B; @Slf4j @Component public class DeadLetterQueueConsumer { @Autowired private RabbitTemplate rabbitTemplate; /** * 监听死信队列A * @param message 接收的信息 */ //@RabbitListener(queues = "dead_letter_queue_a") @RabbitListener(queues = DEAD_LETTER_QUEUE_A) public void receiveA(Message message) { String msg = new String(message.getBody()); // 记录日志 log.info("当前时间:{},死信队列A收到的消息:{}", LocalDateTime.now(), msg); } /** * 监听死信队列B * @param message 接收的信息 */ //@RabbitListener(queues = "dead_letter_queue_b") @RabbitListener(queues = DEAD_LETTER_QUEUE_B) public void receiveB(Message message){ String msg = new String(message.getBody()); // 记录日志 log.info("当前时间:{},死信队列B收到的消息:{}", LocalDateTime.now(), msg); } }
1.7、创建控制类
import com.cd.springbootrabbitmq.enums.DelayTypeEnum; import com.cd.springbootrabbitmq.producer.DelayMessageProducer; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.time.LocalDateTime; import java.util.Objects; @Slf4j @RestController @RequestMapping("/rabbitmq") public class RabbitMQController { @Autowired private DelayMessageProducer producer; @RequestMapping("/send") public void send(String message, Integer delayType){ // 记录日志 log.info("当前时间:{},消息:{},延迟类型:{}", LocalDateTime.now(), message, delayType); // 发送延迟消息 producer.sendDelayMessage(message, Objects.requireNonNull(DelayTypeEnum.getDelayTypeEnum(delayType))); } }
1.8、测试
在浏览器中先后提交下面两个请求:
1)localhost:8080/rabbitmq/send?message=测试自定义延迟处理60s&delayType=2
2)localhost:8080/rabbitmq/send?message=测试自定义延迟处理10s&delayType=1
查看idea控制台:
到此这篇关于Java中RabbitMQ延迟队列实现详解的文章就介绍到这了,更多相关RabbitMQ延迟队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!