java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > SpringBoot集成RocketMQ事务消息

SpringBoot集成RocketMQ事务消息的完整指南

作者:IT橘子皮

事务消息是 RocketMQ 提供的一种高级消息类型,用于解决分布式场景下,本地数据库事务与消息发送之间的一致性问题,下面小编就来和大家聊聊SpringBoot集成RocketMQ事务消息的完整方法吧

事务消息是 RocketMQ 提供的一种高级消息类型,用于解决分布式场景下,本地数据库事务与消息发送之间的一致性问题。它通过两阶段提交事务状态回查机制,确保本地事务执行与消息投递达到最终一致性,尤其适用于订单支付、积分变更等需要高可靠性的业务场景。

事务消息的核心原理

事务消息的核心机制可以概括为以下两个阶段和一种补偿机制:

第一阶段:发送半消息(Half Message)​

第二阶段:提交或回滚

生产者开始执行本地事务​(例如,操作本地数据库)。

根据本地事务的执行结果(成功或失败),生产者向 Broker 发送 ​二次确认指令​(Commit 或 Rollback)。

事务回查(Transaction Check)​

为了更直观地理解整个流程,下图概括了事务消息的完整生命周期:

在SpringBoot项目中实现事务消息

下面我们基于 rocketmq-spring-boot-starter来实现一个完整的事务消息示例,以“订单支付成功后通知积分服务增加积分”为场景。

1. 添加依赖

首先确保 pom.xml中包含必要的依赖:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.3</version>
</dependency>

2. 配置生产者与事务监听器

核心是创建一个事务监听器,它包含了执行本地事务和处理事务回查的两个方法。

import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;

@Service
@RocketMQTransactionListener(txProducerGroup = "tx-order-group") // 与发送方组名一致
public class OrderTransactionListenerImpl implements RocketMQLocalTransactionListener {

    @Autowired
    private OrderService orderService;
    
    /**
     * 执行本地事务
     * @param msg 收到的消息
     * @param arg 调用sendMessageInTransaction时传入的额外参数
     * @return 事务状态
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 从消息头或arg中获取业务ID,如订单ID
        String orderId = (String) msg.getHeaders().get("orderId");
        try {
            // 执行本地业务逻辑,例如:更新订单状态为“支付成功”
            boolean success = orderService.updateOrderStatus(orderId, OrderStatus.PAID);
            // 根据执行结果返回提交或回滚
            return success ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK;
        } catch (Exception e) {
            // 记录日志,返回UNKNOWN状态,等待Broker回查
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }

    /**
     * 事务回查方法
     * @param msg 收到的消息
     * @return 事务状态
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        String orderId = (String) msg.getHeaders().get("orderId");
        // 根据orderId查询数据库,确认本地事务的最终状态
        OrderStatus status = orderService.queryOrderStatus(orderId);
        if (OrderStatus.PAID.equals(status)) {
            // 本地事务已成功,提交消息
            return RocketMQLocalTransactionState.COMMIT;
        } else if (OrderStatus.FAILED.equals(status)) {
            // 本地事务已失败,回滚消息
            return RocketMQLocalTransactionState.ROLLBACK;
        } else {
            // 状态仍不明确,继续等待下次回查
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }
}

3. 发送事务消息

在业务服务中,使用 RocketMQTemplate发送事务消息。

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Service
public class OrderService {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    public void payOrder(String orderId) {
        // 1. 构建消息
        Message<String> message = MessageBuilder.withPayload("订单支付成功,增加积分")
                .setHeader("orderId", orderId) // 设置业务ID,用于回查
                .build();
        
        // 2. 发送事务消息
        // 参数1: 事务组名(需与监听器内txProducerGroup一致)
        // 参数2: 主题(Topic)
        // 参数3: 消息体
        // 参数4: 可选参数,会传递给executeLocalTransaction方法的arg参数
        TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("tx-order-group", "order-topic", message, orderId);
        
        System.out.println("发送结果:" + result.getSendStatus());
    }
}

4. 消费者端实现幂等性

事务消息只能保证消息生产端的一致性,消费端需要自行保证消息的幂等性,因为网络重试可能导致消息被重复消费。

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Service
@RocketMQMessageListener(topic = "order-topic", consumerGroup = "order-consumer-group")
public class OrderConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        // 1. 解析消息,获取订单ID
        // 2. 【关键】幂等性校验:查询数据库或Redis,判断该订单的积分是否已经添加过
        // if (已处理) { return; }
        
        // 3. 执行业务逻辑(例如,为用户增加积分)
        // creditService.addCredit(...);
        
        // 4. 记录处理状态,标记该消息已处理
    }
}

重要注意事项与最佳实践

以上就是SpringBoot集成RocketMQ事务消息的完整指南的详细内容,更多关于SpringBoot集成RocketMQ事务消息的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文