java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > springboot db与mq双写不一致

Spring Boot 事务实战之如何解决 DB 与 MQ 的"双写不一致"问题

作者:qq_25624705

本文结合代码案例探讨如何利用Spring的 TransactionSynchronizationManager 实现事务提交后触发(Trigger After Commit)机制,优雅解决数据库与消息队列的"双写一致性"问题,感兴趣的朋友跟随小编一起看看吧

摘要:在分布式系统中,“先存数据库还是先发消息”是一个经典的架构难题。特别是在 IM 系统的多媒体消息处理场景中,如果处理顺序不当,不仅会导致对象存储(OSS)中产生无法回收的“孤儿文件”,还会引发并发重复处理的问题。本文结合代码案例,探讨如何利用 Spring 的 TransactionSynchronizationManager 实现 事务提交后触发 (Trigger After Commit) 机制,优雅解决数据库与消息队列的“双写一致性”问题。

1. 引言:一个看似简单的顺序问题

在“信令与媒体分离”的架构中,核心流程通常如下:API 服务收到消息 -> 落库(标记为 Pending) -> 异步通知 Worker 搬运文件

这一流程涉及两个异构系统的写操作:

在实际开发中,直觉性的代码编写往往会陷入以下误区:

误区一:先发消息,后入库

// ❌ 错误示范
natsPublisher.publish(task); // 1. 消息发出,Worker 开始下载转存
messageRepository.save(message); // 2. 数据库报错(如字段超长、唯一键冲突)
// 后果:DB 回滚,业务无记录,但 OSS 中产生了一个永远无法被引用的“孤儿文件”。

误区二:在事务内发消息

// ❌ 错误示范
@Transactional
public void handle() {
    messageRepository.save(message);
    natsPublisher.publish(task); 
    // 3. 代码执行完毕,但在事务提交(Commit)的一瞬间数据库连接断开
}
// 后果:Worker 收到任务并完成处理,但在回调更新状态时发现 DB 中不存在该记录。

2. 核心方案:事务提交后的“惊险一跃”

为了保证 “只有数据库确确实实持久化成功了,才去触发异步任务”,最佳实践是利用 Spring 框架提供的事务同步机制。

以下是优化后的代码实现:

2.1 主业务逻辑

// 1. 准备阶段:预生成任务(纯内存操作,无副作用)
// 此时并没有真正发送 NATS 消息,只是构建了对象
List<MediaTransferTask> mediaTasks = prepareMediaTransferTasks(msg, ids.sessionId());
// 2. 构建消息实体
WxMessage message = buildMessage(msg, accountId, ids.sessionId(), ids.senderId());
try {
    // 【核心步骤 A】数据库落库 (Source of Truth)
    // 这是唯一的“事实来源”。如果这里失败,后续一切都不应发生。
    messageRepository.save(message);
    log.info("Message saved: id={}, wxid={}", message.getId(), message.getWxid());
    // 3. 发布会话更新事件 (内存事件或 MQ)
    SessionUpdateEvent event = SessionUpdateEvent.builder()
        .accountId(accountId)
        // ... build params
        .build();
    sessionEventPublisher.publishSessionUpdate(event);
    // 【核心步骤 B】注册事务回调
    // 关键点:这里不是立即发送,而是“预约”发送
    publishMediaTransferTasksAfterCommit(mediaTasks);
    return ProcessResult.success();
} catch (DataIntegrityViolationException e) {
    // 【并发场景的保护】
    // 如果两个线程同时处理同一条消息(如网络重放或客户端重试),
    // 数据库的唯一索引会抛出此异常。
    // 由于消息发送逻辑在事务提交后执行,失败的线程事务回滚,
    // 因此“afterCommit”钩子不会被触发,完美避免了 Worker 重复搬运文件。
    log.debug("Duplicate message (concurrent): {}", msg.getMessageId());
    return ProcessResult.duplicate();
}

2.2 事务同步器的实现

publishMediaTransferTasksAfterCommit 方法利用了 Spring 的 TransactionSynchronizationManager 来挂载回调。
private void publishMediaTransferTasksAfterCommit(List<MediaTransferTask> tasks) {
    if (CollectionUtils.isEmpty(tasks)) {
        return;
    }
    // 判断当前是否在事务中
    if (TransactionSynchronizationManager.isActualTransactionActive()) {
        // 注册同步器
        TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
            @Override
            public void afterCommit() {
                // 【真正发送的时机】
                // 只有当 DB 事务成功 Commit 后,这一行才会执行
                // 此时 DB 里一定有数据,Worker 回调一定能成功
                tasks.forEach(mediaTransferPublisher::publishMediaTransfer);
                log.debug("Async media tasks published after commit: size={}", tasks.size());
            }
        });
    } else {
        // 如果不在事务中(比如非事务方法调用),则立即发送(降级策略)
        tasks.forEach(mediaTransferPublisher::publishMediaTransfer);
    }
}

3. 深度解析:方案优势

3.1 杜绝“孤儿资源”

通过 afterCommit 钩子,严格保证了因果关系:因(DB落库成功) -> 果(触发搬运)。 如果 messageRepository.save(message) 因为任何原因(业务校验失败、数据库异常)导致事务回滚,afterCommit 回调将永远不会被执行,NATS 消息也就不会发出,从而从源头上避免了 OSS 资源的浪费。

3.2 天然的幂等性防护

代码中对 DataIntegrityViolationException 的捕获处理是该方案的另一大亮点。 在分布式场景下,消息重复投递是常见现象。

4. 兜底策略:应对“反向不一致”

虽然该方案解决了“有文件没记录”的问题,但理论上仍存在极低概率的“反向不一致”:DB 提交成功了,但在执行 afterCommit 发送 NATS 消息的一瞬间,服务宕机或断电。

此时,数据库中存在一条状态为 PENDING 的记录,但永远不会有 Worker 来处理它。

为了达到金融级的一致性,系统应补充一个兜底补偿机制

5. 总结

在处理“数据库事务”与“外部系统调用(MQ/RPC)”混合的业务场景时,“事务同步器(Transaction Synchronization)” 是 Spring 体系中解决双写一致性问题的利器。

通过这一模式的重构,系统实现了:

核心原则:先落库,再提交,回调之中发消息。

到此这篇关于Spring Boot 事务实战之如何优雅解决 DB 与 MQ 的"双写不一致"问题的文章就介绍到这了,更多相关springboot db与mq双写不一致内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文