Redis

关注公众号 jb51net

关闭
首页 > 数据库 > Redis > Redis MQ消费幂等

Redis处理MQ消费幂等的实现示例

作者:sjsjsbbsbsn

本文主要介绍了Redis处理MQ消费幂等的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

本方案参考马哥短连接项目中的消息幂等处理方案

一.生成者模版代码

@Slf4j
@Component
@RequiredArgsConstructor
@ConditionalOnProperty(name = "message-queue.type", havingValue = "rocketmq")
public class CustomMessageProducer implements MessageQueueProducer {
    
    private final RocketMQTemplate rocketMQTemplate;

    @Value("${rocketmq.producer.topic}")
    private String customTopic;

    /**
     * 通用的发送方法,允许自定义消息内容和处理逻辑
     * @param messagePayload 消息体数据
     */
    @Override
    public void send(Map<String, String> messagePayload) {
        // 生成唯一消息键,用于标识消息的幂等性
        String messageId = UUID.randomUUID().toString();
        messagePayload.put("messageId", messageId);

        // 构建消息
        Message<Map<String, String>> message = MessageBuilder
                .withPayload(messagePayload)
                .setHeader(MessageConst.PROPERTY_KEYS, messageId)
                .build();

        // 发送消息并处理结果
        try {
            SendResult sendResult = rocketMQTemplate.syncSend(customTopic, message, 2000L);
            log.info("消息发送成功: 状态={}, 消息ID={}, 消息键={}", sendResult.getSendStatus(), sendResult.getMsgId(), messageId);
        } catch (Exception e) {
            log.error("消息发送失败: 消息内容={}", JSON.toJSONString(messagePayload), e);
            // 添加自定义的失败处理逻辑
        }
    }
}

关键说明

这样设计有助于实现通用的消息发送,只需更改 messagePayload 数据结构和自定义处理逻辑即可适应不同业务。

二.消息幂等处理器模版代码

package com.example.project.mq.idempotent;

import lombok.RequiredArgsConstructor;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
 * 消息幂等处理器
 */
@Component
@RequiredArgsConstructor
public class MessageIdempotentHandler {

    private final StringRedisTemplate stringRedisTemplate;
    private static final String IDEMPOTENT_KEY_PREFIX = "message:idempotent:";

    /**
     * 判断消息是否已被处理
     *
     * @param messageId 消息唯一标识
     * @return true 表示消息未处理,可以继续处理;false 表示消息已处理,避免重复消费
     */
    public boolean isMessageNotProcessed(String messageId) {
        String key = IDEMPOTENT_KEY_PREFIX + messageId;
        // 尝试设置新键,如果不存在则返回 true 表示未处理,存在则返回 false
        return Boolean.TRUE.equals(stringRedisTemplate.opsForValue().setIfAbsent(key, "0", 2, TimeUnit.MINUTES));
    }

    /**
     * 标记消息处理流程完成
     *
     * @param messageId 消息唯一标识
     */
    public void markAsProcessed(String messageId) {
        String key = IDEMPOTENT_KEY_PREFIX + messageId;
        stringRedisTemplate.opsForValue().set(key, "1", 2, TimeUnit.MINUTES);
    }

    /**
     * 查询消息是否已经处理完成
     *
     * @param messageId 消息唯一标识
     * @return true 表示消息处理已完成,false 表示未完成
     */
    public boolean isProcessingComplete(String messageId) {
        String key = IDEMPOTENT_KEY_PREFIX + messageId;
        return Objects.equals(stringRedisTemplate.opsForValue().get(key), "1");
    }

    /**
     * 处理异常时删除幂等标识
     *
     * @param messageId 消息唯一标识
     */
    public void clearProcessedFlag(String messageId) {
        String key = IDEMPOTENT_KEY_PREFIX + messageId;
        stringRedisTemplate.delete(key);
    }
}

职责描述

三.生产者代码模版

@Override
public void onMessage(Map<String, String> producerMap) {
    // 获取消息的唯一标识符
    String keys = producerMap.get("keys");
    
    // 检查是否已处理过该消息,幂等性控制
    if (!messageQueueIdempotentHandler.isMessageProcessed(keys)) {
        // 若该消息流程尚未完成
        if (messageQueueIdempotentHandler.isAccomplish(keys)) {
            return; // 跳过已完成流程的消息
        }
        throw new ServiceException("消息未完成流程,需要消息队列重试");
    }
    
    // 业务逻辑处理
    try {
       	//调用业务方法代码
        }
    } catch (Throwable ex) {
        log.error("消费异常", ex);
        throw ex;
    }
    
    // 标记消息处理完成
    messageQueueIdempotentHandler.setAccomplish(keys);
}


职责描述

onMessage 模板的职责是接收并处理消息队列中的消息,确保幂等性,并在需要时抛出异常让消息队列进行重试。以下是该方法中关键步骤的职责和操作说明:

四.消息消费流程

假设消息消费应用场景如下:

五.总结

在本方案中,通过使用 Redis 实现 MQ 消息的幂等处理,确保了消息在消费过程中只会被处理一次,避免了重复消费带来的业务异常和资源浪费。其主要特点和优势如下:

总的来说,此方案为消息幂等性控制提供了一种可扩展、通用且高效的实现方式,非常适合在高并发分布式系统中应用,能够有效提高消息消费的稳定性和安全性

到此这篇关于Redis处理MQ消费幂等的实现示例的文章就介绍到这了,更多相关Redis MQ消费幂等内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文