RabbitMQ的ACK确认机制保障消费端消息的可靠性详解
作者:warybee
这篇文章主要介绍了RabbitMQ的ACK确认机制保障消费端消息的可靠性详解,简单来说,就是你必须关闭 RabbitMQ 的自动ack ,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里 ack 一把,需要的朋友可以参考下
1. 概述
如果消费端在你消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,那么就尴尬了,RabbitMQ 认为你都消费了,这数据就丢了。这个时候得用 RabbitMQ 提供的 ack 机制,简单来说,就是你必须关闭 RabbitMQ 的自动ack ,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里 ack 一把。这样的话,如果你还没处理完,不就没有 ack 了?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。
生产端消息可靠性保证可以使用RabbitMQ的confirm机制。
2. ACK机制与消费端消息补偿机制
把channel.basicConsume(...)方法的autoAck参数改为false
channel.basicAck(long deliveryTag, boolean multiple);方法,消费成功签收
参数说明:
- deliveryTag:消息标识
- multiple:是否批量签收
basicNack(long deliveryTag, boolean multiple, boolean requeue) ,消息消费失败
参数说明:
- deliveryTag:消息标识
- multiple:是否批量签收
- requeue:true 消息会重回队列,false 消息会进入到死信队列
3. 代码演示
生产端
public static void main(String[] args) throws Exception{ ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //设置虚拟主机 connectionFactory.setVirtualHost("/"); //创建一个链接 Connection connection = connectionFactory.newConnection(); //创建channel Channel channel = connection.createChannel(); String exchangeName="test_ack_exchange"; String routeKey="ack.test"; for (int i=0;i<5;i++){ Map<String, Object> headers = new HashMap<String, Object>(); //演示重回队列机制,使用num==0的消息签收失败重回队列 headers.put("num", i); AMQP.BasicProperties properties=new AMQP.BasicProperties().builder() .deliveryMode(2) .contentEncoding("UTF-8") .headers(headers) .build(); String msg="RabbitMQ send message ack test!"+i; channel.basicPublish(exchangeName,routeKey,properties,msg.getBytes()); } }
消息端
public static void main(String[] args) throws Exception{ System.out.println("======消息接收start=========="); ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //设置虚拟主机 connectionFactory.setVirtualHost("/"); //创建链接 Connection connection = connectionFactory.newConnection(); //创建channel Channel channel = connection.createChannel(); String exchangeName="test_ack_exchange"; String exchangeType="topic"; //声明Exchange channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null); String queueName="test_ack_queue"; //声明队列 channel.queueDeclare(queueName,true,false,false,null); String routeKey="ack.#"; //绑定队列和交换机 channel.queueBind(queueName,exchangeName,routeKey); /** * autoAck:false 设置为手工签收 */ channel.basicConsume(queueName, false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收到消息::"+new String(body)); try { Thread.sleep(3000); //休眠5秒 } catch (InterruptedException e) { e.printStackTrace(); } //演示重回队列机制,使用num==0的消息签收失败重回队列 if((Integer)properties.getHeaders().get("num") == 0) { /** * 参数说明:1、消息标识 2、是否批量签收 3、是否重回队列 */ channel.basicNack(envelope.getDeliveryTag(), false, true); } else { channel.basicAck(envelope.getDeliveryTag(), false); } } }); }
运行代码以上后,由于在消费端,设置了第一条消息,签收失败重回队列,在RabbitMQ控制台中我们可以看到始终有一条消息未签收确认
到此这篇关于RabbitMQ的ACK确认机制保障消费端消息的可靠性详解的文章就介绍到这了,更多相关RabbitMQ的ACK确认机制内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!