java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > RabbitMQ消息幂等性

RabbitMQ 如何解决消息幂等性的问题

作者:王小白_Ada

这篇文章主要介绍了RabbitMQ 如何解决消息幂等性的问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

前言

关于MQ消费者的幂等性问题,在于MQ的重试机制,因为网络原因或客户端延迟消费导致重复消费。使用MQ重试机制需要注意的事项以及如何解决消费者幂等性问题以下将逐一讲解。

1. RabbitMQ自动重试机制

消费者在消费消息的时候,如果消费者业务逻辑出现程序异常,这个时候我们如何处理?

使用重试机制,RabbitMQ默认开启重试机制。

实现原理:

注意:

配置:

spring:
  rabbitmq:
    # 连接地址
    host: 127.0.0.1
    # 端口号
    port: 5672
    # 账号
    username: guest
    # 密码
    password: guest
    # 地址(类似于数据库的概念)
    virtual-host: /admin_vhost
    # 消费者监听相关配置
    listener:
      simple:
        retry:
          # 开启消费者(程序出现异常)重试机制,默认开启并一直重试
          enabled: true
          # 最大重试次数
          max-attempts: 5
          # 重试间隔时间(毫秒)
          initial-interval: 3000

2. 如何合理选择重试机制?

情况1: 消费者获取到消息后,调用第三方接口,但接口暂时无法访问,是否需要重试? 需要重试,可能是因为网络原因短暂不能访问

情况2: 消费者获取到消息后,抛出数据转换异常,是否需要重试? 不需要重试,因为属于程序bug需要重新发布版本

总结:对于情况2,如果消费者代码抛出异常是需要发布新版本才能解决的问题,那么不需要重试,重试也无济于事。应该采用日志记录+定时任务job进行健康检查+人工进行补偿

3. 调用第三方接口自动实现补偿机制

我们知道了,RabbitMQ在消费者消费发生异常时,会自动进行补偿机制,所以我们(消费者)在调用第三方接口时,可以根据返回结果判断是否成功:

4. 如何解决消费者幂等性问题

防止重复消费 (MQ重试机制需要注意的问题)

产生原因:网络延迟传输中,消费者出现异常或者消费者延迟消费,会造成进行MQ重试补偿,在重试过程中,可能会造成重复消费。

面试题:MQ中消费者如何保证幂等性问题,不被重复消费?

在这里插入图片描述

伪代码:

生产者核心代码:

请求头设置消息id(messageId)

@Component
public class FanoutProducer {
 @Autowired
 private AmqpTemplate amqpTemplate;

 public void send(String queueName) {
  String msg = "my_fanout_msg:" + System.currentTimeMillis();
  //请求头设置消息id(messageId)
  Message message = MessageBuilder.withBody(msg.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON)
    .setContentEncoding("utf-8").setMessageId(UUID.randomUUID() + "").build();
  System.out.println(msg + ":" + msg);
  amqpTemplate.convertAndSend(queueName, message);
 }
}

消费者核心代码:

@RabbitListener(queues = "fanout_email_queue")
 public void process(Message message) throws Exception {
  // 获取消息Id
  String messageId = message.getMessageProperties().getMessageId();
  String msg = new String(message.getBody(), "UTF-8");
  //② 判断唯一Id是否被消费,消息消费成功后将id和状态保存在日志表中,我们从(①步骤)表中获取并判断messageId的状态即可
  //从redis中获取messageId的value
  String value = redisUtils.get(messageId)+"";
  if(value.equals("1") ){ //表示已经消费
   return; //结束
  }
  System.out.println("邮件消费者获取生产者消息" + "messageId:" + messageId + ",消息内容:" + msg);
  JSONObject jsonObject = JSONObject.parseObject(msg);
  // 获取email参数
  String email = jsonObject.getString("email");
  // 请求地址
  String emailUrl = "http://127.0.0.1:8083/sendEmail?email=" + email;
  JSONObject result = HttpClientUtils.httpGet(emailUrl);
  if (result == null) {
   // 因为网络原因,造成无法访问,继续重试
   throw new Exception("调用接口失败!");
  }
  System.out.println("执行结束....");
  //① 执行到这里已经消费成功,我们可以修改messageId的状态,并存入日志表(可以存到redis中,key为消息Id、value为状态)
 }

5. SpringBoot整合RabbitMQ应答模式(ACK)

1.修改配置simple下添加 acknowledge-mode: manual:

spring:
  rabbitmq:
    # 连接地址
    host: 127.0.0.1
    # 端口号
    port: 5672
    # 账号
    username: guest
    # 密码
    password: guest
    # 地址(类似于数据库的概念)
    virtual-host: /admin_vhost
    # 消费者监听相关配置
    listener:
      simple:
        retry:
          # 开启消费者(程序出现异常)重试机制,默认开启并一直重试
          enabled: true
          # 最大重试次数
          max-attempts: 5
          # 重试间隔时间(毫秒)
          initial-interval: 3000
        # 开启手动ack
        acknowledge-mode: manual

2.消费者增加代码:

Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); 手动ack
channel.basicAck(deliveryTag, false);手动签收
//邮件队列
@Component
public class FanoutEamilConsumer {
 @RabbitListener(queues = "fanout_email_queue")
 public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
  System.out
    .println(Thread.currentThread().getName() + ",邮件消费者获取生产者消息msg:" + new String(message.getBody(), "UTF-8")
      + ",messageId:" + message.getMessageProperties().getMessageId());
  // 手动ack
  Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
  // 手动签收
  channel.basicAck(deliveryTag, false);
 }
}

RabbitMQ 如何保证幂等性,数据一致性

mq的作用主要是用来解耦,削峰,异步,

增加MQ,系统的复杂性也会增加很多,

也会带来其他的问题,比如MQ挂了怎么办,怎么保持数据的幂等性

幂等性问题通俗点讲就是保证数据不被重复消费,同时数据也不能少,

也就是数据一致性问题。

下面是MQ丢失的3种情况

rabbitmq-message-lose

1,生产者发送消息至MQ的数据丢失

解决方法:在生产者端开启comfirm 确认模式,你每次写的消息都会分配一个唯一的 id,

然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你说这个消息 ok 了

2,MQ收到消息,暂存内存中,还没消费,自己挂掉,数据会都丢失

解决方式:MQ设置为持久化。将内存数据持久化到磁盘中

3,消费者刚拿到消息,还没处理,挂掉了,MQ又以为消费者处理完

解决方式:用 RabbitMQ 提供的 ack 机制,简单来说,就是你必须关闭 RabbitMQ 的自动 ack,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里 ack 一把。这样的话,如果你还没处理完,不就没有 ack 了?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。

rabbitmq-message-lose-solution

数据重复的问题简单的多,就是在消费端判断数据是否已经被消费过

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文