java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > RabbitMQ保证消息的可靠性

RabbitMQ保证消息的可靠性问题及解决

作者:又旅行又开拓的绳匠..

文章主要讨论了如何确保消息队列中消息的可靠性,包括生产者重试机制、确认机制、MQ持久化配置和消费者确认机制等,还介绍了消费者本地重试和失败消息入队的处理策略,以及幂等性的实现方案,最后给出了一个综合的配置示例

消息可靠性问题

在消息队列,任何一个环节出问题都会导致消息的不可靠(消息丢失),如何确保消息的可靠性呢,需要考虑到其中的每个角色,生产者可靠性、MQ可靠性、消费者可靠性。

生产者可靠性

生产者重试

首先第一种情况,就是生产者发送消息时,出现了网络故障,导致与MQ的连接中断。

为了解决这个问题,SpringAMQP提供的消息发送时的重试机制。即:当RabbitTemplate与MQ连接超时后,多次重试。

spring:
  rabbitmq:
    connection-timeout: 1s # 设置MQ的连接超时时间
    template:
      retry:
        enabled: true # 开启超时重试机制
        initial-interval: 1000ms # 失败后的初始等待时间
        multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = 上次等待时长 * multiplier
        max-attempts: 3 # 总共尝试次数

耗尽重试次数后,依旧失败,记录失败消息到数据库失败消息表,用于后期执行补偿错误。如使用定时任务去扫描这个表,重新发送消息

生产者确认

1.Publisher Return

消息投递成功但路由失败会调用Publisher Return回调方法返回异常信息。

2.Publisher Confirm

消息投递成功返回ack,投递失败返回nack。

注意:消息投递成功但可能路由失败了,此时会通过Publisher Confirm返回ack,通过Publisher Return回调方法返回异常信息。

默认两种机制都是关闭状态,需要通过配置文件来开启。

spring:
  rabbitmq:
    publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
    publisher-returns: true # 开启publisher return机制

MQ可靠性

为了提升性能,默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置持久化,包括:

在配置的时候默认都会持久化

消费者可靠性

消费者确认机制

为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。

即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ消息处理状态。

回执有三种可选值:

SpringAMQP帮我们实现了消息确认,并可以通过配置文件设置消息确认的处理方式,有三种模式:

none:不处理。即消息投递给消费者后消息会立刻从MQ删除。非常不安全,不建议使用

manual:手动模式。需要自己在业务代码中调用api,发送ackreject,存在业务入侵,但更灵活

auto:自动模式。当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:

通过下面的配置可以修改消息确认的处理方式为auto:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto # 自动ack

auto模式就是平常的写法

manual模式需要手写

    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg, Channel channel, Message message) throws InterruptedException, IOException {
        log.info("spring 消费者接收到消息:【" + msg + "】");
        //返回nack
        //每个参数的意义:1.消息的标记 2.是否确认之前所有未确认的消息 3.是否重新入队
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
//        log.info("消息处理完成");
//        //返回ack,每个参数的意义:1.消息的标记 2.是否确认之前所有消息
//        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

失败重试机制

本地重试

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次返回到队列,再次投递,直到消息处理成功为止。

极端情况就是消费者一直无法执行成功,那么消息投递就会无限循环,导致mq的消息处理飙升,带来不必要的压力。

为了应对上述情况Spring又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的投递到mq队列。

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = 上次等待时长 * multiplier
          max-attempts: 3 # 最大重试次数

失败消息入队

本地测试达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。

因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的,它有3个不同实现:

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个固定交换机,通过交换机将消息转发到失败消息队列,程序监听失败消息队列,接收到失败消息,将失败消息存入失败消息表,通过定时任务进行处理。

//在consumer服务中定义处理失败消息的交换机和队列
@Bean
public DirectExchange errorMessageExchange(){
    return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
    return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
    return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
//定义一个RepublishMessageRecoverer,指定失败消息投递交换机的名称及routingkey
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}

监听失败消息队列将失败消息写入数据库中,由人工定期处理

业务幂等性

幂等性:在程序开发中,是指同一个业务,执行一次或多次对业务状态的影响是一致的。

在程序开发中,是指同一个业务,执行一次或多次对业务状态的影响是一致的。

例如:

但数据的更新往往不是幂等的,如果重复执行可能造成不一样的后果。比如:

所以,我们要尽可能避免业务被重复执行,然而在实际业务场景中,由于意外经常会出现业务被重复执行的情况。

例如:

因此,我们必须想办法保证消息处理的幂等性。

这里给出两种方案:

唯一消息ID思路非常简单:

业务判断就是基于业务本身的逻辑或状态来判断是否是重复的请求,不同的业务场景判断的思路也不一样。

例如在支付通知案例中,处理消息的业务逻辑是把订单状态从未支付修改为已支付。因此我们就可以在执行更新时判断订单状态是否是未支付,如果不是则证明订单已经被处理过,无需重复处理。

相比较而言,使用唯一消息ID的方案需要操作数据库或Redis保存消息ID,所以更推荐使用业务判断的方案。

1.创建交换机,队列,消息进行持久化

2.生产者:

3.消费者:

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文