深入分析RabbitMQ中死信队列与死信交换机
作者:小白的救赎
这篇文章主要介绍了RabbitMQ中死信队列与死信交换机,死信队列就是一个普通的交换机,有些队列的消息成为死信后,一般情况下会被RabbitMQ清理,感兴趣想要详细了解可以参考下文
介绍
DLX(Dead Letter Exchange)死信交换机。当消息成为Dead Message时,可以被重新发送到另一个交换机。这个交换机就是死信交换机。这里主要有两个问题:第一是消息如何判断为死信消息,第二则是消息如何从队列中传到死信交换机。
消息成为死信消息的三种情况
- 队列消息长度达到限制
- 消费者拒收消息(basicNack()或basicReject),且不把消息重新放回队列(basicNack()方法第三个参数)
- 消息TTL过期下面我演示的就是这种情况
队列绑定死信交换机
- 给队列设置参数:x-dead-letter-exchange 和 x-dead-letter-routing-key
生产者端
目录结构
导入依赖
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.5.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> </dependencies>
修改yml
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
publisher-returns: true # 开启退回回调
#三个类型:none默认不开启确认回调 correlated开启确认回调
#simple也会确认回调 还会调用waitForConfirms()方法或waitForConfirmsOrDie()方法
publisher-confirm-type: correlated # 开启确认回调
业务逻辑
/** * 定义交换机与队列的Bean 并且使之绑定 * 生产者 -> 普通交换机 -> 普通队列 -> 消费者1 * | * -> 死信交换机 -> 死信队列 -> 消费者2 * 需要两个交换机两个队列、三个路由键。两个普通之间我用了"test.#" * 普通与死信之间用了"test.dead.heHe" 两个死信之间用了"test.dead.#" 生产的消息用的就是"test.dead.heHe" */ @Component public class RabbitMQConfig { public static final String EXCHANGE_NAME = "test_exchange_name"; public static final String QUEUE_NAME = "test_queue_name"; public static final String DEAD_EXCHANGE_NAME = "dead_exchange_name"; public static final String DEAD_QUEUE_NAME = "dead_queue_name"; @Bean("testExchange") public Exchange testExchange(){ return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build(); } @Bean("deadExchange") public Exchange deadExchange(){ return ExchangeBuilder.topicExchange(DEAD_EXCHANGE_NAME).durable(true).build(); } //普通队列绑定死信交换机并且带上路由键 为了实现死信消息于是配置队列TTL @Bean("testQueue") public Queue testQueue(){ return QueueBuilder.durable(QUEUE_NAME).ttl(5000).deadLetterExchange(DEAD_EXCHANGE_NAME) .deadLetterRoutingKey("test.dead.heHe").build(); } @Bean("deadQueue") public Queue deadQueue(){ return QueueBuilder.durable(DEAD_QUEUE_NAME).build(); } @Bean public Binding link(@Qualifier("testExchange") Exchange exchange, @Qualifier("testQueue") Queue queue){ return BindingBuilder.bind(queue).to(exchange).with("test.#").noargs(); } @Bean public Binding deadLink(@Qualifier("deadExchange") Exchange exchange, @Qualifier("deadQueue") Queue queue){ return BindingBuilder.bind(queue).to(exchange).with("test.dead.#").noargs(); } }
@SpringBootTest @RunWith(SpringRunner.class) class RabbitmqProducerApplicationTests { @Autowired private RabbitTemplate rabbitTemplate; @Test void testProducer() { rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean b, String s) { if(b) System.out.println("交换机成功接受到了消息"); else System.out.println("消息失败原因" + s); } }); // 设置交换机处理失败消息的模式 // true:消息到达不了队列时 会将消息重新返回给生产者 false:消息到达不了队列直接丢弃 rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int i, String s, String s1, String s2) { System.out.println("队列接受不到交换机的消息进行了失败回调"); } }); for(int i = 0; i < 10; ++i){ rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"test.dead.heHe","HelloWorld"); } } }
到此这篇关于深入分析RabbitMQ中死信队列与死信交换机的文章就介绍到这了,更多相关RabbitMQ死信队列与死信交换机内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!