rocketMQ如何避免消息重复消费问题
作者:qq_16570607
rocketMQ如何避免消息重复消费
在互联网应用中,尤其在网络不稳定的情况下,消息队列 RocketMQ 的消息有可能会出现重复,造成重复消费。
这个重复简单可以概括为以下情况:
- produce发送到Broker时消息重复
当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者producer宕机,导致服务端Broker 对 producer应答失败。
如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
- Broker投递消息到Consumer时消息重复
消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。
为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
- 负载均衡时消息重复(包括但不限于网络抖动、Broker 重启以及订阅方应用重启)
当消息队列 RocketMQ 的 Broker或客户端重启、扩容或缩容时,会触发Rebalance重平衡机制,此时消费者可能会收到重复消息。
幂等性解决方案
幂等性:就是用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。
从上面的分析中,我们知道,在RocketMQ中,是无法保证每个消息只被投递一次的,所以要在业务上自行来保证消息消费的幂等性。而要处理这个问题,RocketMQ的每条消息都有一个唯一的MessageId,这个参数在多次投递的过程中是不会改变的,所以业务上可以用这个MessageId来作为判断幂等的关键依据。
但是,这个MessageId是无法保证全局唯一的,也会有冲突的情况。所以在一些对幂等性要求严格的场景,最好是使用业务上唯一的一个标识比较靠谱。这个id可以使用分布式中间件redis,zookeeper等去生成。例如订单ID。而这个业务标识可以使用Message的Key来进行传递。
同时,建立一个消息表,拿到这个消息做数据库的insert操作。给这个消息做一个唯一主键(primary key)或者唯一约束,那么就算出现重复消费的情况,就会导致主键冲突,那么就不再处理这条消息。
消息持久层做消息唯一性的策略(该方案未经证实)
1.持久化过程中业务唯一标识验证,每个消息具有业务唯一标识,在消息最终持久化之前通过验证唯一性标识保证消息的唯一性。消息持久化位置如果出现同样的消息,系统就不做处理,期间无任何传递过程,保证消息的唯一性。
2.使用过程中业务唯一标识验证,使用过程中如果出现同样的消息,系统进行相应的异常处理。
rocketMQ消息重复消费场景
答案
将去重操作直接放在了消费端,消费端处理消息的业务逻辑保持幂等性。消费者收到消息后,从消息中获取消息标识写入到Redis(分布式锁)或数据库(标识作为表唯一索引插入一条记录),当再次收到该消息时就不作处理。
在broker端对Queue加锁(synchronized),Consumer监听的Queue存在已投递但未收到ack且未超时的消息,不允许获取锁,直到该Queue投递的消息全部ack或者消费超时,才允许新的Consumer获取锁,拉取消息。
思考问题
- 1、为什么不在生产者去重?
- 2、为什么在消费者做去重?
一、网络波动导致系统A消息发送到RocketMQ后没有收到消息发送超时,系统A重试导致消息重复
1、RocketMQ支持消息查询的功能,消息发送前去RocketMQ查询一下是否已经发送过该条消息,存在则不发送,不存在发送到RocketMQ。在高并发的场景下,每条消息在发送到RocketMQ时都去查询一下,会影响接口的性能。
2、redis分布式锁,在发送消息到RocketMQ成功之后,向redis中插入一条数据,如果发生重试,则先去redis中查询是否存在,存在的话不再发送消息。redis集群此时宕机,再次查询redis判断消息是否已经发送过,无法得到正确结果的。
以上两种方式只是保证只发送了一次,不能保证消费只一次的情况。
二、MQ要保证消息投递的可靠性,对未ack的消息,会重复投递。
- 场景一:broker发送Consumer超时后重新发送
消费者端要保证消费的幂等性,从消息中获取消息标识写入到Redis或数据库,当再次收到该消息时不作处理。
- 场景二:负载均衡阶段,前一个监听Queue的消费实例拉取的消息未全部ack,新的消费实例监听到这个Queue重新拉取消息。
在负载均衡结果变化过程增加了一个过渡态,在过渡态的时候,Consumer会继续保留上一次负载均衡的结果,直到原消费者拉取的消息全部ack,才释放老的结果。
在broker端对Queue加锁(synchronized),Consumer监听的Queue存在已投递但未收到ack且未超时的消息,不允许获取锁,直到该Queue投递的消息全部ack或者消费超时,才允许新的Consumer获取锁,拉取消息。
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。