RabbitMQ中的死信队列(Dead Letter Exchanges)详解
作者:warybee
这篇文章主要介绍了RabbitMQ中的死信队列(Dead Letter Exchanges)详解,当RabbitMQ出现死信,可能会导致业务逻辑错误,比如下订单后修改库存操作,在下单后因为某种原因,发送的消息未被签收,这时库存数据会出现不一致,需要的朋友可以参考下
RabbitMQ死信队列
1. 介绍
当消息在一个队列中变为死信后,它被重新发送到另一个Exchange。
2. 在什么情况下会出现死信
- 消息未被签收,在消费端使用了 basic.reject 或 basic.nack ,并且requeue设置为false
- 消息过期(TTL)
- 消息队列达到了最大长度
3. 实际应用
当RabbitMQ出现死信,可能会导致业务逻辑错误,比如下订单后修改库存操作,在下单后因为某种原因,发送的消息未被签收,这时库存数据会出现不一致。
有死信队列之后我们就可以监听死信队列,来处理业务逻辑。
3.1 死信队列设置
声明队列,添加参数x-dead-letter-exchange
Map<String, Object> agruments = new HashMap<String, Object>(); agruments.put("x-dead-letter-exchange", "dlx.exchange"); //这个agruments属性,要设置到声明队列上 channel.queueDeclare(queueName, true, false, false, agruments);
死信队列,是一个普通的Exchange和queue,需要设置死信Exchange和queue,并进行绑定
/要进行死信队列的声明: channel.exchangeDeclare("dlx.exchange", "topic", true, false, null); channel.queueDeclare("dlx.queue", true, false, false, null); //可以匹配任意routeKey channel.queueBind("dlx.queue", "dlx.exchange", "#");
4 代码实现
生产端
public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchange = "test_dlx_exchange"; String routingKey = "dlx.test"; String msg = "RabbitMQ DLX Message test"; for(int i =0; i<1; i ++){ AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .deliveryMode(2) .contentEncoding("UTF-8") .expiration("10000") //过期时间为1秒 .build(); channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes()); } }
消费端
public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.11.76"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); // 这就是一个普通的交换机 和 队列 以及路由 String exchangeName = "test_dlx_exchange"; String routingKey = "dlx.#"; String queueName = "test_dlx_queue"; channel.exchangeDeclare(exchangeName, "topic", true, false, null); Map<String, Object> agruments = new HashMap<String, Object>(); agruments.put("x-dead-letter-exchange", "dlx.exchange"); //这个agruments属性,要设置到声明队列上 channel.queueDeclare(queueName, true, false, false, agruments); channel.queueBind(queueName, exchangeName, routingKey); //要进行死信队列的声明: channel.exchangeDeclare("dlx.exchange", "topic", true, false, null); channel.queueDeclare("dlx.queue", true, false, false, null); //可以匹配任意routeKey channel.queueBind("dlx.queue", "dlx.exchange", "#"); channel.basicConsume(queueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收到消息::"+new String(body)); } }); }
运行以上代码,在交换机(Exchange)中会多出一个名为dlx.exchange 类型为topic的交换机,队列中也有一个dlx.queue 队列。
到此这篇关于RabbitMQ中的死信队列(Dead Letter Exchanges)详解的文章就介绍到这了,更多相关RabbitMQ死信队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!