java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Spring Boot 3 集成 RabbitMQ

Spring Boot 3 集成 RabbitMQ 实践指南(原理解析)

作者:翱翔-蓝天

本文介绍了SpringBoot 3集成RabbitMQ的实践指南,涵盖了RabbitMQ的核心原理、核心概念、高级特性、应用场景、环境搭建、核心配置类、消息生产者、消息消费者、接口控制器、监控与运维、最佳实践以及常见问题与解决方案等内容,感兴趣的朋友一起看看吧

Spring Boot 3 集成 RabbitMQ 实践指南

1. RabbitMQ 核心原理

1.1 什么是RabbitMQ

RabbitMQ是一个开源的消息代理和队列服务器,使用Erlang语言开发,基于AMQP(Advanced Message Queuing Protocol)协议实现。它支持多种消息传递模式,具有高可用性、可扩展性和可靠性等特点。

1.2 核心概念

1.2.1 基础组件

Producer(生产者)

Consumer(消费者)

Exchange(交换机)

Queue(队列)

Binding(绑定)

1.2.2 高级特性

消息持久化

消息确认机制

死信队列(DLX)

1.3 应用场景

异步处理

应用解耦

流量控制

定时任务

2. 环境搭建

2.1 基础环境

2.2 依赖配置

<dependencies>
    <!-- Spring Boot Starter AMQP -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <!-- Spring Boot Starter Web -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- Lombok -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <!-- Jackson -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
    </dependency>
</dependencies>

2.3 基础配置

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    # 消息确认配置
    publisher-confirm-type: correlated  # 开启发布确认
    publisher-returns: true             # 开启发布返回
    template:
      mandatory: true                   # 消息路由失败返回
    # 消费者配置
    listener:
      simple:
        acknowledge-mode: manual        # 手动确认
        prefetch: 1                     # 每次获取消息数量
        retry:
          enabled: true                 # 开启重试
          initial-interval: 1000        # 重试间隔时间
          max-attempts: 3               # 最大重试次数
          multiplier: 1.0              # 重试时间乘数
    # SSL配置(可选)
    ssl:
      enabled: false
      key-store: classpath:keystore.p12
      key-store-password: password
      trust-store: classpath:truststore.p12
      trust-store-password: password

3. 核心配置类

3.1 RabbitMQ配置类

@Configuration
@EnableRabbit
public class RabbitMQConfig {
    // 交换机名称
    public static final String BUSINESS_EXCHANGE = "business.exchange";
    public static final String DEAD_LETTER_EXCHANGE = "dead.letter.exchange";
    // 队列名称
    public static final String BUSINESS_QUEUE = "business.queue";
    public static final String DEAD_LETTER_QUEUE = "dead.letter.queue";
    // 路由键
    public static final String BUSINESS_KEY = "business.key";
    public static final String DEAD_LETTER_KEY = "dead.letter.key";
    // 业务交换机
    @Bean
    public DirectExchange businessExchange() {
        return ExchangeBuilder.directExchange(BUSINESS_EXCHANGE)
                .durable(true)
                .build();
    }
    // 死信交换机
    @Bean
    public DirectExchange deadLetterExchange() {
        return ExchangeBuilder.directExchange(DEAD_LETTER_EXCHANGE)
                .durable(true)
                .build();
    }
    // 业务队列
    @Bean
    public Queue businessQueue() {
        Map<String, Object> args = new HashMap<>(3);
        // 消息过期时间
        args.put("x-message-ttl", 60000);
        // 队列最大长度
        args.put("x-max-length", 1000);
        // 死信交换机
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        args.put("x-dead-letter-routing-key", DEAD_LETTER_KEY);
        return QueueBuilder.durable(BUSINESS_QUEUE)
                .withArguments(args)
                .build();
    }
    // 死信队列
    @Bean
    public Queue deadLetterQueue() {
        return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
    }
    // 业务绑定
    @Bean
    public Binding businessBinding() {
        return BindingBuilder.bind(businessQueue())
                .to(businessExchange())
                .with(BUSINESS_KEY);
    }
    // 死信绑定
    @Bean
    public Binding deadLetterBinding() {
        return BindingBuilder.bind(deadLetterQueue())
                .to(deadLetterExchange())
                .with(DEAD_LETTER_KEY);
    }
    // 消息转换器
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
    // RabbitTemplate配置
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(messageConverter());
        return rabbitTemplate;
    }
}

3.2 消息确认配置

@Configuration
@Slf4j
public class RabbitConfirmConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            log.info("消息发送到交换机成功: correlationData={}", correlationData);
        } else {
            log.error("消息发送到交换机失败: correlationData={}, cause={}", correlationData, cause);
            // 处理失败逻辑,如重试、告警等
        }
    }
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        log.error("消息路由到队列失败: exchange={}, routingKey={}, replyCode={}, replyText={}, message={}",
                returned.getExchange(),
                returned.getRoutingKey(),
                returned.getReplyCode(),
                returned.getReplyText(),
                new String(returned.getMessage().getBody()));
        // 处理失败逻辑,如重试、告警等
    }
}

4. 消息生产者

4.1 消息发送服务

@Service
@Slf4j
public class MessageProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public void sendMessage(Object message, String exchange, String routingKey) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        try {
            rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
            log.info("消息发送成功: message={}, exchange={}, routingKey={}, correlationData={}",
                    message, exchange, routingKey, correlationData);
        } catch (Exception e) {
            log.error("消息发送异常: message={}, exchange={}, routingKey={}, correlationData={}, error={}",
                    message, exchange, routingKey, correlationData, e.getMessage());
            throw new RuntimeException("消息发送失败", e);
        }
    }
    public void sendDelayMessage(Object message, String exchange, String routingKey, long delayMillis) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        MessagePostProcessor messagePostProcessor = msg -> {
            msg.getMessageProperties().setDelay((int) delayMillis);
            return msg;
        };
        try {
            rabbitTemplate.convertAndSend(exchange, routingKey, message, messagePostProcessor, correlationData);
            log.info("延迟消息发送成功: message={}, exchange={}, routingKey={}, delay={}, correlationData={}",
                    message, exchange, routingKey, delayMillis, correlationData);
        } catch (Exception e) {
            log.error("延迟消息发送异常: message={}, exchange={}, routingKey={}, delay={}, correlationData={}, error={}",
                    message, exchange, routingKey, delayMillis, correlationData, e.getMessage());
            throw new RuntimeException("延迟消息发送失败", e);
        }
    }
}

5. 消息消费者

5.1 消息处理服务

@Service
@Slf4j
public class MessageConsumer {
    @RabbitListener(queues = RabbitMQConfig.BUSINESS_QUEUE)
    public void handleMessage(Message message, Channel channel) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            // 获取消息内容
            String messageBody = new String(message.getBody());
            log.info("收到消息: message={}, deliveryTag={}", messageBody, deliveryTag);
            // 业务处理
            processMessage(messageBody);
            // 手动确认消息
            channel.basicAck(deliveryTag, false);
            log.info("消息处理成功: deliveryTag={}", deliveryTag);
        } catch (Exception e) {
            log.error("消息处理异常: deliveryTag={}, error={}", deliveryTag, e.getMessage());
            // 判断是否重新投递
            if (message.getMessageProperties().getRedelivered()) {
                log.error("消息已重试,拒绝消息: deliveryTag={}", deliveryTag);
                channel.basicReject(deliveryTag, false);
            } else {
                log.info("消息首次处理失败,重新投递: deliveryTag={}", deliveryTag);
                channel.basicNack(deliveryTag, false, true);
            }
        }
    }
    private void processMessage(String message) {
        // 实现具体的业务逻辑
        log.info("处理消息: {}", message);
    }
}

5.2 死信消息处理

@Service
@Slf4j
public class DeadLetterConsumer {
    @RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUE)
    public void handleDeadLetter(Message message, Channel channel) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            String messageBody = new String(message.getBody());
            log.info("收到死信消息: message={}, deliveryTag={}", messageBody, deliveryTag);
            // 死信消息处理逻辑
            processDeadLetter(messageBody);
            channel.basicAck(deliveryTag, false);
            log.info("死信消息处理成功: deliveryTag={}", deliveryTag);
        } catch (Exception e) {
            log.error("死信消息处理异常: deliveryTag={}, error={}", deliveryTag, e.getMessage());
            channel.basicReject(deliveryTag, false);
        }
    }
    private void processDeadLetter(String message) {
        // 实现死信消息处理逻辑
        log.info("处理死信消息: {}", message);
    }
}

6. 接口控制器

@RestController
@RequestMapping("/api/mq")
@Slf4j
public class MessageController {
    @Autowired
    private MessageProducer messageProducer;
    @PostMapping("/send")
    public ResponseEntity<String> sendMessage(@RequestBody MessageDTO message) {
        try {
            messageProducer.sendMessage(message.getContent(),
                    RabbitMQConfig.BUSINESS_EXCHANGE,
                    RabbitMQConfig.BUSINESS_KEY);
            return ResponseEntity.ok("消息发送成功");
        } catch (Exception e) {
            log.error("消息发送失败: {}", e.getMessage());
            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
                    .body("消息发送失败: " + e.getMessage());
        }
    }
    @PostMapping("/send/delay")
    public ResponseEntity<String> sendDelayMessage(
            @RequestBody MessageDTO message,
            @RequestParam long delayMillis) {
        try {
            messageProducer.sendDelayMessage(message.getContent(),
                    RabbitMQConfig.BUSINESS_EXCHANGE,
                    RabbitMQConfig.BUSINESS_KEY,
                    delayMillis);
            return ResponseEntity.ok("延迟消息发送成功");
        } catch (Exception e) {
            log.error("延迟消息发送失败: {}", e.getMessage());
            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
                    .body("延迟消息发送失败: " + e.getMessage());
        }
    }
}

7. 监控与运维

7.1 RabbitMQ管理界面

7.2 Prometheus + Grafana监控

# prometheus.yml
scrape_configs:
  - job_name: 'rabbitmq'
    static_configs:
      - targets: ['localhost:15692']

7.3 日志配置

logging:
  level:
    org.springframework.amqp: INFO
    com.your.package: DEBUG
  pattern:
    console: "%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n"

7.4 告警配置

@Configuration
public class RabbitMQAlertConfig {
    @Value("${alert.dingtalk.webhook}")
    private String webhookUrl;
    @Bean
    public AlertService alertService() {
        return new DingTalkAlertService(webhookUrl);
    }
}

8. 最佳实践

8.1 消息幂等性处理

@Service
public class MessageIdempotentHandler {
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    public boolean isProcessed(String messageId) {
        String key = "mq:processed:" + messageId;
        return Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(key, "1", 24, TimeUnit.HOURS));
    }
}

8.2 消息重试策略

@Configuration
public class RetryConfig {
    @Bean
    public RetryTemplate retryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();
        FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        backOffPolicy.setBackOffPeriod(1000);
        retryTemplate.setBackOffPolicy(backOffPolicy);
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(3);
        retryTemplate.setRetryPolicy(retryPolicy);
        return retryTemplate;
    }
}

8.3 消息序列化

@Configuration
public class MessageConverterConfig {
    @Bean
    public MessageConverter jsonMessageConverter() {
        Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
        converter.setCreateMessageIds(true);
        return converter;
    }
}

8.4 消息追踪

@Aspect
@Component
@Slf4j
public class MessageTraceAspect {
    @Around("@annotation(org.springframework.amqp.rabbit.annotation.RabbitListener)")
    public Object traceMessage(ProceedingJoinPoint joinPoint) throws Throwable {
        String messageId = MDC.get("messageId");
        log.info("开始处理消息: messageId={}", messageId);
        try {
            Object result = joinPoint.proceed();
            log.info("消息处理完成: messageId={}", messageId);
            return result;
        } catch (Exception e) {
            log.error("消息处理异常: messageId={}, error={}", messageId, e.getMessage());
            throw e;
        }
    }
}

9. 常见问题与解决方案

9.1 消息丢失问题

9.2 消息重复消费

9.3 消息堆积问题

9.4 性能优化

10. 高可用部署

10.1 集群配置

spring:
  rabbitmq:
    addresses: rabbit1:5672,rabbit2:5672,rabbit3:5672
    username: admin
    password: password
    virtual-host: /

10.2 镜像队列

# 设置镜像策略
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'

10.3 负载均衡

# nginx.conf
upstream rabbitmq_cluster {
    server rabbit1:15672;
    server rabbit2:15672;
    server rabbit3:15672;
}

11. 参考资源

到此这篇关于Spring Boot 3 集成 RabbitMQ 实践指南的文章就介绍到这了,更多相关Spring Boot 3 集成 RabbitMQ内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文