java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > RabbitMQ 消息重试机制

RabbitMQ消息发送失败的重试机制核心流程

作者:Wang's Blog

本文介绍了基于NestJS和RabbitMQ的高可靠消息重试机制设计,通过三级保障机制(预持久化、实时回调处理、定时任务补偿)和灵活的存储层(支持MySQL/Redis),确保消息在发送失败时能够可靠重试,感兴趣的朋友跟随小编一起看看吧

核心问题

重试机制核心流程

1 )消息发送前持久化

2 )消息发送后处理

3 )定时任务补偿

关键业务流程与 RabbitMQ 交互

NestJS 服务层实现

1 ) 消息存储接口设计(TransMessageService

// trans-message.interface.ts
export interface TransMessageService {
  saveForSend(
    exchange: string,
    routingKey: string,
    payload: string
  ): Promise<TransMessagePO>;
  deleteOnAck(id: string): Promise<void>;
  saveOnReturn(
    id: string,
    exchange: string,
    routingKey: string,
    payload: string 
  ): Promise<TransMessagePO>;
  listPendingMessages(): Promise<TransMessagePO[]>;
  incrementRetryCount(id: string): Promise<void>;
  markAsDead(id: string): Promise<void>;
}

2 ) 数据库实现(TypeORM + PostgreSQL)

// trans-message.service.ts 
import { Injectable } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { TransMessagePO } from './trans-message.entity';
@Injectable()
export class DBTransMessageService implements TransMessageService {
  constructor(
    @InjectRepository(TransMessagePO)
    private readonly repo: Repository<TransMessagePO>,
    private readonly serviceName: string 
  ) {}
  async saveForSend(exchange: string, routingKey: string, payload: string) {
    const message = new TransMessagePO();
    message.id = uuidv4();
    message.serviceName = this.serviceName;
    message.exchange = exchange;
    message.routingKey = routingKey;
    message.payload = payload;
    message.retryCount = 0;
    message.status = 'PENDING';
    return this.repo.save(message);
  }
  async deleteOnAck(id: string) {
    await this.repo.delete({ id, serviceName: this.serviceName });
  }
  // 其他方法实现类似,略 
}

消息发送器(TransMessageSender)

1 ) 发送消息核心逻辑

// trans-message.sender.ts
import { Injectable, Logger } from '@nestjs/common';
import { RabbitMQService } from '@golevelup/nestjs-rabbitmq';
import { TransMessageService } from './trans-message.interface';
@Injectable()
export class TransMessageSender {
  private readonly logger = new Logger(TransMessageSender.name);
  constructor(
    private readonly rmqService: RabbitMQService,
    private readonly messageService: TransMessageService
  ) {}
  async send(exchange: string, routingKey: string, payload: any) {
    const payloadString = JSON.stringify(payload);
    try {
      // 1. 持久化到数据库
      const messagePO = await this.messageService.saveForSend(
        exchange,
        routingKey,
        payloadString
      );
      // 2. 发送到RabbitMQ
      await this.rmqService.publish(exchange, routingKey, payload, {
        messageId: messagePO.id, // 关键:设置消息ID
        persistent: true
      });
      this.logger.log(`Message sent: ${messagePO.id}`);
    } catch (e) {
      this.logger.error(`Send failed: ${e.message}`, e.stack);
    }
  }
}

2 ) 回调处理(Confirm & Return)

// rabbitmq.config.ts
import { RabbitMQModule } from '@golevelup/nestjs-rabbitmq';
@Module({
  imports: [
    RabbitMQModule.forRoot(RabbitMQModule, {
      exchanges: [{ name: 'orders', type: 'topic' }],
      uri: 'amqp://localhost',
      connectionInitOptions: { wait: false },
      // 注册回调函数
      setupController: (channel) => {
        channel.on('return', (msg) => this.handleReturn(msg));
        channel.on('ack', (msg) => this.handleAck(msg));
      }
    })
  ]
})
export class AppModule {
  handleReturn(msg: ConsumeMessage) {
    const { exchange, routingKey } = msg.fields;
    const payload = msg.content.toString();
    const messageId = msg.properties.messageId; // 关键:获取消息ID
    this.messageService.saveOnReturn(messageId, exchange, routingKey, payload);
  }
  handleAck(msg: ConsumeMessage) {
    const messageId = msg.properties.messageId;
    this.messageService.deleteOnAck(messageId);
  }
}

定时任务与重试控制

// retry.task.ts
import { Injectable, Logger } from '@nestjs/common';
import { SchedulerRegistry } from '@nestjs/schedule';
@Injectable()
export class RetryTask {
  private readonly logger = new Logger(RetryTask.name);
  private readonly MAX_RETRY = 5;
  constructor(
    private messageService: TransMessageService,
    private scheduler: SchedulerRegistry
  ) {
    this.startRetryCycle();
  }
  startRetryCycle() {
    const interval = setInterval(async () => {
      const messages = await this.messageService.listPendingMessages();
      for (const msg of messages) {
        if (msg.retryCount >= this.MAX_RETRY) {
          await this.messageService.markAsDead(msg.id);
          this.triggerAlert(`Message dead: ${msg.id}`);
          continue;
        }
        await this.messageService.incrementRetryCount(msg.id);
        await this.resendMessage(msg);
      }
    }, 60_000); // 每分钟执行 
    this.scheduler.addInterval('retry-job', interval);
  }
  private async resendMessage(msg: TransMessagePO) {
    // 调用发送器重新发送(略)
  }
}

NestJS 核心代码实现

  1. 消息持久化服务层(TransMessageService
// trans-message.service.ts
import { Injectable } from '@nestjs/common';
import { TransMessage } from './entity/trans-message.entity';
import { Repository } from 'typeorm';
import { InjectRepository } from '@nestjs/typeorm';
import { MessageType } from './enums/message-type.enum';
@Injectable()
export class TransMessageService {
  constructor(
    @InjectRepository(TransMessage)
    private readonly messageRepo: Repository<TransMessage>,
    private readonly serviceName: string, // 注入服务标识
  ) {}
  // 发送前持久化
  async createReadyMessage(
    exchange: string,
    routingKey: string,
    payload: string,
  ): Promise<TransMessage> {
    const message = this.messageRepo.create({
      id: uuidv4(),
      service: this.serviceName,
      exchange,
      routingKey,
      payload,
      sendDate: new Date(),
      sequence: 0,
      type: MessageType.READY,
    });
    return this.messageRepo.save(message);
  }
  // 发送成功删除记录 
  async markMessageSuccess(id: string): Promise<void> {
    await this.messageRepo.delete({ id, service: this.serviceName });
  }
  // 消息路由失败后重新持久化
  async recreateReturnedMessage(
    exchange: string,
    routingKey: string,
    payload: string,
  ): Promise<TransMessage> {
    return this.createReadyMessage(exchange, routingKey, payload);
  }
  // 查询待重试消息
  async listReadyMessages(): Promise<TransMessage[]> {
    return this.messageRepo.find({
      where: { type: MessageType.READY, service: this.serviceName },
    });
  }
  // 递增重试次数
  async incrementRetryCount(id: string): Promise<void> {
    const message = await this.messageRepo.findOneBy({ id });
    if (message) {
      message.sequence += 1;
      await this.messageRepo.save(message);
    }
  }
  // 标记消息为死亡状态
  async markMessageDead(id: string): Promise<void> {
    const message = await this.messageRepo.findOneBy({ id });
    if (message) {
      message.type = MessageType.DEAD;
      await this.messageRepo.save(message);
    }
  }
}
  1. 消息发送器(TransMessageSender
// trans-message.sender.ts
import { Injectable, Logger } from '@nestjs/common';
import { RabbitMQService } from '@golevelup/nestjs-rabbitmq';
import { TransMessageService } from './trans-message.service';
@Injectable()
export class TransMessageSender {
  private readonly logger = new Logger(TransMessageSender.name);
  constructor(
    private readonly rmqService: RabbitMQService,
    private readonly transMessageService: TransMessageService,
  ) {}
  async send(exchange: string, routingKey: string, payload: object): Promise<void> {
    try {
      // 1. 序列化消息
      const payloadString = JSON.stringify(payload);
      // 2. 持久化到数据库
      const message = await this.transMessageService.createReadyMessage(
        exchange,
        routingKey,
        payloadString,
      );
      // 3. 发送至 RabbitMQ
      await this.rmqService.publish(exchange, routingKey, payload, {
        messageId: message.id,
        contentType: 'application/json',
      });
      this.logger.log(`Message sent: ${message.id}`);
    } catch (e) {
      this.logger.error(`Send failed: ${e.message}`, e.stack);
    }
  }
}
  1. RabbitMQ 回调处理
// rmq.config.ts
import { RabbitMQModule } from '@golevelup/nestjs-rabbitmq';
import { TransMessageService } from './trans-message.service';
@Module({
  imports: [
    RabbitMQModule.forRoot(RabbitMQModule, {
      exchanges: [{ name: 'orders', type: 'topic' }],
      uri: 'amqp://localhost:5672',
      connectionInitOptions: { wait: false },
      // 注册回调处理器 
      setupController: (channel) => {
        // 确认回调 
        channel.on('ack', (msg) => {
          const messageId = msg.properties.messageId;
          this.transMessageService.markMessageSuccess(messageId);
        });
        // 返回回调
        channel.on('return', (msg) => {
          const { exchange, routingKey } = msg.fields;
          const payload = msg.content.toString();
          this.transMessageService.recreateReturnedMessage(
            exchange,
            routingKey,
            payload,
          );
        });
      },
    }),
  ],
})
export class RmqConfigModule {}

工程示例:1

1 ) 方案1:基础持久化+重试(推荐)

2 ) 方案2:Redis 高性能存储

// redis-trans-message.service.ts
import { RedisService } from '@liaoliaots/nestjs-redis';
@Injectable()
export class RedisTransMessageService implements TransMessageService {
  private readonly KEY_PREFIX = 'msg:';
  constructor(private redisService: RedisService) {}
  async saveForSend(exchange: string, routingKey: string, payload: string) {
    const id = uuidv4();
    const client = this.redisService.getClient();
    await client.hset(`${this.KEY_PREFIX}${id}`, {
      exchange,
      routingKey,
      payload,
      retryCount: '0',
      status: 'PENDING'
    });
    return { id } as TransMessagePO;
  }
  // 其他方法通过Redis HSET/HGET/DEL实现 
}

3 ) 方案3:分布式事务框架集成

工程示例:2

1 ) 方案 1:基础数据库重试

// task.service.ts 
@Injectable()
export class RetryTaskService {
  constructor(private readonly transMessageService: TransMessageService) {}
  @Cron('*/5 * * * * *') // 每5秒执行
  async handleRetry() {
    const messages = await this.transMessageService.listReadyMessages();
    messages.forEach(async (msg) => {
      if (msg.sequence < 5) {
        await this.transMessageService.incrementRetryCount(msg.id);
        // 重新发送逻辑(略)
      } else {
        await this.transMessageService.markMessageDead(msg.id);
        // 触发告警(略)
      }
    });
  }
}

2 ) 方案 2:Redis 高性能暂存

优势:

// 使用 Redis 存储消息
import { Redis } from 'ioredis';
@Injectable()
export class RedisTransMessageService implements TransMessageService {
  private readonly redis = new Redis();
  async createReadyMessage(
    exchange: string,
    routingKey: string,
    payload: string,
  ): Promise<TransMessage> {
    const id = uuidv4();
    await this.redis.hset(
      `msg:${id}`,
      'payload', payload,
      'exchange', exchange,
      'routingKey', routingKey,
      'sequence', '0',
    );
    return { id, exchange, routingKey, payload, sequence: 0 };
  }
}

3 ) 方案 3:死信队列(DLX)自动重试

RabbitMQ 配置命令:

创建死信交换机和队列
rabbitmqadmin declare exchange name=dlx type=direct
rabbitmqadmin declare queue name=dead_messages
rabbitmqadmin declare binding source=dlx destination=dead_messages routing_key=dead
主队列绑定死信路由
rabbitmqadmin declare queue name=orders \
  arguments='{"x-dead-letter-exchange":"dlx", "x-dead-letter-routing-key":"dead"}'

NestJS 消费死信消息:

@RabbitSubscribe({
  exchange: 'dlx',
  routingKey: 'dead',
  queue: 'dead_messages',
})
async handleDeadMessage(msg: any) {
  // 解析原始消息ID并重新持久化
  const originalMsgId = msg.properties.headers['x-original-message-id'];
  await this.transMessageService.recreateReturnedMessage(
    msg.fields.exchange,
    msg.fields.routingKey,
    msg.content.toString(),
  );
}

工程示例:3

1 ) 方案1:数据库存储方案(TypeORM)

// trans-message.service.ts
@Injectable()
export class TransMessageService {
  constructor(
    @InjectRepository(TransMessage)
    private readonly messageRepo: Repository<TransMessage>
  ) {}
  async prePersist(exchange: string, routingKey: string, payload: string) {
    const message = this.messageRepo.create({
      exchange,
      routingKey,
      payload,
      status: 'PENDING'
    });
    return this.messageRepo.save(message);
  }
  async markAsSuccess(id: string) {
    await this.messageRepo.delete(id);
  }
}

2 ) 方案2:Redis 高性能存储方案

// redis-trans.service.ts
import { RedisService } from '@liaoliaots/nestjs-redis';
@Injectable()
export class RedisTransService {
  constructor(private readonly redisService: RedisService) {}
  async prePersist(exchange: string, routingKey: string, payload: string) {
    const client = this.redisService.getClient();
    const id = uuidv4();
    await client.hset(
      `msg:${id}`,
      'exchange', exchange,
      'routingKey', routingKey,
      'payload', payload,
      'status', 'PENDING'
    );
    return id;
  }
}

3 ) 方案3:混合存储方案(数据库+Redis)

关键配置与命令

RabbitMQ 必要设置

启用持久化交换机和队列
rabbitmqctl set_policy HA ".*" '{"ha-mode":"all"}' --apply-to all
监控命令
rabbitmqctl list_queues name messages_ready messages_unacknowledged

NestJS 模块配置

// app.module.ts
@Module({
  imports: [
    TypeOrmModule.forFeature([TransMessagePO]),
    RabbitMQModule.forRootAsync(RabbitMQModule, {
      useFactory: () => ({
        uri: process.env.RABBITMQ_URI,
        exchanges: [{ name: 'orders', type: 'topic', durable: true }]
      })
    })
  ],
  providers: [
    { provide: TransMessageService, useClass: DBTransMessageService },
    TransMessageSender,
    RetryTask
  ]
})
export class AppModule {}

RabbitMQ 周边配置处理

1 ) NestJS 连接配置

// rabbitmq.module.ts
import { RabbitMQModule } from '@golevelup/nestjs-rabbitmq';
@Module({
  imports: [
    RabbitMQModule.forRoot(RabbitMQModule, {
      exchanges: [{ name: 'orders', type: 'topic' }],
      uri: 'amqp://user:pass@localhost:5672',
      connectionInitOptions: { wait: false }
    })
  ]
})
export class RabbitModule {}

2 ) 关键 Shell 命令

创建带持久化的交换机
rabbitmqadmin declare exchange name=orders type=topic durable=true
监控未路由消息
rabbitmqctl list_queues name messages_unroutable

3 ) 告警机制实现

// alert.service.ts
@Injectable()
export class AlertService {
  async triggerAlert(messageId: string) {
    // 集成邮件/Slack/Webhook
    await slackService.send(`消息 ${messageId} 重试超限!`);
  }
}

核心优化点

  1. 消息轨迹追踪
    通过 properties.messageId 实现全链路消息关联
  2. 并发控制
    使用 Redis 分布式锁防止多副本重试冲突:
    import Redlock from 'redlock';
    const lock = await redlock.acquire([`lock:${messageId}`], 5000);
    
  3. 退避策略
    指数级延长重试间隔:delay = Math.min(2 retryCount * 1000, 60000)

关键设计原则:

补充知识点

  1. RabbitMQ 持久化机制
    • 消息需设置 deliveryMode: 2
    • 队列声明时添加 durable: true
  2. NestJS 微服务模式
    // main.ts 启用混合模式
    app.connectMicroservice<RabbitMQTransportOptions>({
      transport: Transport.RMQ,
      options: { urls: ['amqp://...'], queue: 'retry_queue' }
    });
    
  3. 监控指标
    • 重试成功率
    • 平均重试耗时
    • 死信队列堆积量

关键问题解决方案

并发重试控制

const lockKey = `lock:msg:${msg.id}`;
const lock = await redis.set(lockKey, '1', 'EX', 30, 'NX');
if (lock) { /* 执行重试 */ }

使用 Redis 分布式锁确保多副本服务不会重复处理同一条消息:

消息幂等性设计

@RabbitSubscribe({ exchange: 'orders', routingKey: 'order.create' })
async handleOrderEvent(msg: any, @Message() amqpMsg) {
  const messageId = amqpMsg.properties.messageId;
  if (await this.redis.exists(`processed:${messageId}`)) return;
  // 处理业务...
}

注意事项

  1. 消息完整性:
    • 必须设置messageIdpersistent: true确保RabbitMQ端持久化。
  2. 重试设计:
    • 采用指数退避策略(如1s/5s/30s)避免雪崩。
  3. 死信处理:
    • 建议将死信转入独立队列人工干预。
  4. 性能优化:
    • 批量查询待重试消息(如每次100条),减少DB压力

关键点:通过数据库/REDIS双写、ACK回调联动、定时补偿三位一体,实现消息的可靠投递,适用于订单支付、库存同步等高可靠性场景

总结

本文实现了基于 NestJS + RabbitMQ 的高可靠消息重试架构,核心创新点:

  1. 三级保障机制:预持久化 → 实时回调处理 → 定时任务补偿。
  2. 灵活存储层:支持 MySQL/Redis 无缝切换,适应不同规模业务。
  3. 生产级方案:提供数据库重试、Redis 高性能方案、死信队列三种工程实现。

关键提示:

到此这篇关于RabbitMQ: 消息发送失败的重试机制设计与实现的文章就介绍到这了,更多相关RabbitMQ 消息重试机制内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文