详解RabbitMq如何做到消息的可靠性投递
前言
现在的一些互联网项目或者是高并发的项目中很少有没有引入消息队列的。 引入消息队列可以给这个项目带来很多的好处:比如
- 削峰
这个就很好的理解,在系统中的请求量是固定的,但是有的时候会多出很多的突发流量,比如在有秒杀活动的时候,这种瞬时的高流量可能会打垮系统,这个时候就可以很好的引入MQ,将这些请求积压到MQ中,然后消费端在按照自已的能力去处理这里请求
- 解耦合
比如现在有系统A,当系统A执行完成后,B、C系统需要拿到A系统的结果才可以继续执行,如果不引入MQ,A系统还要调用B、C系统,这样这A、B、C三个系统的耦合性就很大。引入MQ后A系统的执行结果只需要保证将消息投递到MQ就好,其它的两个系统只需要监听这个MQ的某个队列,这样就降低了这三个系统之间的耦合性。
- 异步
再通过A、B、C这三个系统举例。A系统在返回给用户的执行结果前需要完成B、C系统的调用,这个总的执行时间是A+B+C的执行时间,如果引入MQ,A系统的执行完成后将数据投递到MQ,直接响应用户。B、C再这在通过监听完成数据的处理。这样也降低了用户的等待时间
除了这些好处,当然引入MQ还会有不好的地方:比如
- 数据一致性问题
- A系统执行完将数据投递到了MQ,B、C在消费的时候如果出现了问题,是不是就导致了数据不一致的问题
- 可用性降低
- 一个好好的系统,引入一个MQ,如果这个MQ拓机了呢?这个可能就需要集群来提高MQ的高可用。
- 系统的复杂度提高
- 引入了MQ,我们还需要关注消息是否被成功的投递,MQ中的消息被积压太多怎么办?消费端是否成功的消费的消息。
这些都是问题,所在是否要引入MQ还需要看业务需求
RabbitMq的投递及消费流程
这里有张投递消息到消费的流程图
从这张图上可看出这也是一种AMQP协议的实现。消息的提供者先是通过某一个信道将消息发送到交换机,然后交换机通通RoutingKey来将消息分发到某一个队列上。然后,消费者在临听某一个队列来进行消息的消费。
今天我们的主题是如何保证消息的投递可靠性。那么我们来想想在这个流程中那些位置可能会影响我们消息的投递可靠性?
从上图中我们可以总结出有二个因素影响着消息是否被成功投递和被成功消费
提供者
- 提供者有没有将消成功的发送到MQ并被处理
- 发送到MQ中的消息有没有成功的被路由到队列中
消费者
- 消费者有没有成功的签收消息并成功处理。
- 消费者是否可以保证消费者的稳定性
提供者如何确保消息的成功投递
解决这个问题,我们可以通过提供者的发送方确认机制来实现,这个发送方确认机制又分成三种:
- 单条消息的同步确认
- 多条消息的同步确认
- 异步消息确认
单条消息的同步确认
首先要在当前的Channel上开启消息确认模式,然后通过waitForConfirms()方法进行消息确认是否发送成功。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | public static void main(String[] args) throws InterruptedException, TimeoutException, IOException { ConnectionFactory cf = new ConnectionFactory(); cf.setHost( "host" ); cf.setPort( 5672 ); cf.setUsername( "账号" ); cf.setPassword( "密码" ); try (Connection connection = cf.newConnection(); Channel channel = connection.createChannel()){ channel.confirmSelect(); Map<String,String> mes = new HashMap<>(); mes.put( "name" , "1111" ); String messageStr = objectMapper.writeValueAsString(mes); channel.basicPublish( "exchange.drinks" , "drinks.juzi" , null , messageStr.getBytes()); boolean isSendSuccess = channel.waitForConfirms(); if (isSendSuccess){ System.out.print( "消息发送成功" ); } } } |
这样做的话每次发完消息后,都会确保消息是否发送成功。如果发送失败的话进行相应的处理。
多条消息的同步确认
多条消息的确认和单条的差不多,比如我将发送消息的代码放到一个循环内。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | public static void main(String[] args) throws InterruptedException, TimeoutException, IOException { ConnectionFactory cf = new ConnectionFactory(); cf.setHost( "host" ); cf.setPort( 5672 ); cf.setUsername( "账号" ); cf.setPassword( "密码" ); try (Connection connection = cf.newConnection(); Channel channel = connection.createChannel()){ channel.confirmSelect(); Map<String,String> mes = new HashMap<>(); mes.put( "name" , "1111" ); String messageStr = objectMapper.writeValueAsString(mes); for ( int i = 0 ;i < 100 ;i++){ channel.basicPublish( "exchange.drinks" , "drinks.juzi" , null , messageStr()); } boolean isSendSuccess = channel.waitForConfirms(); System.out.println(isSendSuccess); } } |
这样的话当一批消息发送完成后,进行统一的消息确认是否发送成功,就成了多条的消息确认,不过并不推荐使用这种确认消息的方式
在多条的消息确认中,比如我先是发送了一批的消息,比如这批消息有100条,这个时候如果有其中的一条消息没有发送成功,这里返回的也是false,然尔我们并不能知道是具体的哪 一条消息发送失败。
异步消息确认
异步的消息确认是通过一个监听器来实现的,当消息发送后,会接着执行下面的逻辑,可能在稍会的一段时间,监听器监听到了Broker的返回,再进行逻辑的处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | public static void main(String[] args) throws InterruptedException, TimeoutException, IOException { ConnectionFactory cf = new ConnectionFactory(); cf.setHost( "host" ); cf.setPort( 5672 ); cf.setUsername( "账号" ); cf.setPassword( "密码" ); try (Connection connection = cf.newConnection(); Channel channel = connection.createChannel()){ channel.confirmSelect(); ConfirmListener confirmListener = new ConfirmListener() { @Override public void handleAck( long deliveryTag, boolean multiple) throws IOException { System.out.println( "发送成功:" + deliveryTag + " multiple:" + multiple); } @Override public void handleNack( long deliveryTag, boolean multiple) throws IOException { System.out.println( "发送失败:" + deliveryTag); } }; channel.addConfirmListener(confirmListener); Map<String,String> mes = new HashMap<>(); mes.put( "name" , "11111" ); String messageStr = objectMapper.writeValueAsString(mes); for ( int i = 0 ;i < 100 ;i++){ channel.basicPublish( "exchange.drinks" , "drinks.juzi" , null , messageStr.getBytes()); } Thread.sleep(Integer.MAX_VALUE); } } |
当成功的发送消息的时候会回调监听器中的handleAck
方法,如果没有发送成功会回调handleNack
方法 在这个监听器里面有两个参数一个deliveryTag
和multiple
:
- deliveryTag:表示当前的Channel发送的第几条消息
- multiple:是否在确认多条消息
这个异步的虽然在听觉上感觉比较厉害些,这里也不推荐使用,原因和上面的一样,我们并不能具休的知道是哪一条消息没有被确认发送。
综上:这里更加推荐单条消息确认,具体选择哪一种还是要用业务做出选择
注:注意一点是当一条消息成功的发送到Broker,但是如果没有正确的路由到队列,那么这时borker也是会返回true,因为Broker确时接收到了消息只是RoutingKey不可达,所以这里也会返回true,并且直接将消息丢弃
消息的返回机制
这个消息返回机制的作用就是在当一个消息成功的发送,但是并没有正确路由到队列的时候所回调的。
这也弥补了上面确认消息是否发送成功但没有路由到队列所返回true的问题 在使用消息返回机制的时候在发送消息时需要将mandatory
置成true。再添加对应的监听器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | public static void main(String[] args) throws InterruptedException, TimeoutException, IOException { ConnectionFactory cf = new ConnectionFactory(); cf.setHost( "host" ); cf.setPort( 5672 ); cf.setUsername( "账号" ); cf.setPassword( "密码" ); try (Connection connection = cf.newConnection(); Channel channel = connection.createChannel()){ channel.addReturnListener( new ReturnCallback() { @Override public void handle(Return returnMessage) { System.out.println( "replyCode:" + returnMessage.getReplyCode() + " replyText:" + returnMessage.getReplyText() + " routingKey:" + returnMessage.getRoutingKey() + " exchange:" + returnMessage.getExchange() + " body:" + new String(returnMessage.getBody())); } }); Map<String,String> mes = new HashMap<>(); mes.put( "name" , "11111" ); String messageStr = objectMapper.writeValueAsString(mes); channel.basicPublish( "exchange.drinks" , "drinks.juzi1" , true , null , messageStr.getBytes()); Thread.sleep(Integer.MAX_VALUE); } } |
这里的addReturnListener方法有两个重载:只不过是handle的参数不同,一个是参数都显示在了参数列表内,一个是将参数封装到了Return对象内。当handle被回调的时候也可以获取到相应的参数比如:exchange routingkey body。
注:保证消息可靠性投递的前提是服务的高可用,服务不高可用谈其它的都是扯
以上就是详解RabbitMq如何做到消息的可靠性投递的详细内容,更多关于RabbitMq 消息可靠性投递的资料请关注脚本之家其它相关文章!
微信公众号搜索 “ 脚本之家 ” ,选择关注
程序猿的那些事、送书等活动等着你
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若内容造成侵权/违法违规/事实不符,请将相关资料发送至 reterry123@163.com 进行投诉反馈,一经查实,立即处理!
相关文章
spring cloud zuul 与 sentinel的结合使用操作
这篇文章主要介绍了spring cloud zuul 与 sentinel 的结合使用操作,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教2021-06-06
最新评论