Redis

关注公众号 jb51net

关闭
首页 > 数据库 > Redis > Redis消息队列

Redis 实现消息队列实际案例

作者:祈祷苍天赐我java之术

文章探讨Redis作为消息队列的三大核心方案(List、Pub/Sub、Stream)及适用场景,分析了轻量部署、高性能、多语言支持等优势,指出Stream在可靠性、消息确认、死信队列等企业级需求上表现最佳,适合电商等高吞吐场景,并提供了实际应用案例与优化建议,感兴趣的朋友一起看看吧

一、为什么选择 Redis 做消息队列?

1.1 Redis 消息队列的核心优势

轻量级部署:无需单独部署 RabbitMQ、Kafka 等消息队列服务,可以直接复用现有 Redis 集群。例如一个电商系统可能已经使用 Redis 做缓存,现在只需增加消息队列功能,无需额外维护其他中间件,显著降低运维成本;

高性能:基于内存操作,单节点 QPS 可达 10 万级,满足高吞吐场景。实测表明,在标准服务器配置下,Redis 处理简单消息的延迟可低至 0.1ms,远优于传统磁盘存储的消息队列;

API 简洁:依托 Redis 原生命令即可实现完整队列功能:

支持多语言:所有主流语言的 Redis 客户端(Java/Jedis、Python/redis-py、Go/redigo 等)均原生支持消息队列相关命令。例如 Java 开发者可以直接使用 Jedis 的 lpush() 方法发送消息,无需额外依赖;

可扩展性:通过 Redis Cluster 可以轻松实现消息队列的横向扩展。例如可以将不同业务的消息分配到不同分片,同时利用 Redis Sentinel 实现高可用,确保消息服务不间断。

1.2 适用场景与不适用场景

适用场景

不适用场景

二、Redis 实现消息队列的 3 种核心方案

方案一、基于 Redis List 的简单消息队列实现

1. 方案概述

Redis 的 List 数据结构是一个双向链表,具有以下特性使其非常适合实现消息队列:

1.1 核心命令详解
角色核心命令作用说明时间复杂度
生产者LPUSH key value1 value2从 List 左侧插入消息(头部插入),支持批量插入,返回插入后 List 的长度O(1)
生产者RPUSH key value1 value2从 List 右侧插入消息(尾部插入),支持批量插入O(1)
消费者BLPOP key timeout从 List 左侧阻塞获取消息(头部取出),若 List 为空则等待timeout秒O(1)
消费者BRPOP key timeout从 List 右侧阻塞获取消息(尾部取出),若 List 为空则等待timeout秒O(1)
监控LLEN key获取当前队列的消息数量O(1)
监控LRANGE key start end查看队列中从start到end的消息(如LRANGE queue 0 9查看前10条)O(S+N)

2. 代码实战(Java + Jedis)

2.1 环境准备

首先引入 Jedis 依赖(Maven):

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>4.4.3</version> <!-- 建议使用最新稳定版 -->
</dependency>
2.2 生产者实现
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
public class ListMQProducer {
    // 队列key命名规范:业务域:组件类型:数据结构:具体业务
    private static final String QUEUE_KEY = "redis:mq:list:order"; 
    // 使用连接池提高性能
    private static final JedisPool jedisPool = new JedisPool(
        new JedisPoolConfig(),
        "localhost", 
        6379,
        2000,  // 连接超时时间
        null   // 密码
    );
    public static void main(String[] args) throws InterruptedException {
        try (Jedis jedis = jedisPool.getResource()) {
            // 模拟发送10条订单消息
            for (int i = 1; i <= 10; i++) {
                // 消息内容格式:业务标识_序号_时间戳
                String message = String.format("order_%d_%d", i, System.currentTimeMillis());
                // LPUSH命令将消息放入队列头部
                long queueLength = jedis.lpush(QUEUE_KEY, message);
                System.out.printf("生产者发送消息:%s,当前队列长度:%d%n", message, queueLength);
                // 模拟业务处理间隔
                Thread.sleep(500); 
            }
        } catch (Exception e) {
            System.err.println("生产者异常:" + e.getMessage());
        } finally {
            jedisPool.close();
        }
    }
}
2.3 消费者实现
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import java.util.List;
public class ListMQConsumer {
    private static final String QUEUE_KEY = "redis:mq:list:order";
    private static final JedisPool jedisPool = new JedisPool(
        new JedisPoolConfig(),
        "localhost",
        6379
    );
    public static void main(String[] args) {
        System.out.println("消费者启动,等待接收消息...");
        while (true) {
            try (Jedis jedis = jedisPool.getResource()) {
                // BRPOP命令参数:
                // 1. 超时时间3秒(避免空轮询消耗CPU)
                // 2. 可以监听多个队列
                List<String> messages = jedis.brpop(3, QUEUE_KEY);
                if (messages != null) {
                    // BRPOP返回结果格式:
                    // 第一个元素是队列key
                    // 第二个元素是消息内容
                    String message = messages.get(1);
                    System.out.println("消费者接收消息:" + message);
                    // 业务处理逻辑示例
                    processMessage(message);
                } else {
                    System.out.println("队列暂无消息,继续等待...");
                }
            } catch (Exception e) {
                System.err.println("消费者处理消息异常:" + e.getMessage());
                // 异常处理策略:
                // 1. 记录错误日志
                // 2. 重试机制
                // 3. 告警通知
                try {
                    Thread.sleep(5000); // 出错后暂停5秒
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
    private static void processMessage(String message) throws InterruptedException {
        // 模拟业务处理
        System.out.println("处理消息:" + message);
        // 解析消息内容
        String[] parts = message.split("_");
        String orderId = parts[1];
        // 模拟业务处理耗时
        Thread.sleep(1000);
        System.out.println("订单" + orderId + "处理完成");
    }
}

3. 方案优化与问题解决

3.1 标准方案的局限性
3.2 消息可靠性优化方案

3.2.1 消息确认机制实现

private static final String CONFIRM_QUEUE_KEY = "redis:mq:list:order:confirm";
private static final String DEAD_QUEUE_KEY = "redis:mq:list:order:dead";
private static final int MAX_RETRY = 3;
// 优化后的消费者处理逻辑
List<String> messages = jedis.brpop(3, QUEUE_KEY);
if (messages != null) {
    String message = messages.get(1);
    // 1. 将消息移到待确认队列(使用RPUSH保持顺序)
    jedis.rpush(CONFIRM_QUEUE_KEY, message);
    try {
        // 2. 处理业务逻辑
        processMessage(message);
        // 3. 处理成功,从待确认队列删除
        jedis.lrem(CONFIRM_QUEUE_KEY, 1, message);
    } catch (Exception e) {
        System.err.println("处理消息失败:" + message);
        // 4. 检查重试次数
        long retryCount = jedis.incr("retry:" + message);
        if (retryCount <= MAX_RETRY) {
            // 放回主队列重试
            jedis.lpush(QUEUE_KEY, message);
        } else {
            // 超过重试次数,放入死信队列
            jedis.rpush(DEAD_QUEUE_KEY, message);
        }
        // 无论重试还是加入死信队列,都要从待确认队列删除
        jedis.lrem(CONFIRM_QUEUE_KEY, 1, message);
    }
}

3.2.2 定时补偿任务

// 定时检查待确认队列(每分钟执行)
public void checkConfirmQueue() {
    try (Jedis jedis = jedisPool.getResource()) {
        // 获取待确认队列所有消息
        List<String> pendingMessages = jedis.lrange(CONFIRM_QUEUE_KEY, 0, -1);
        for (String message : pendingMessages) {
            // 检查消息滞留时间
            long createTime = Long.parseLong(message.split("_")[2]);
            long currentTime = System.currentTimeMillis();
            long delay = currentTime - createTime;
            // 超过30秒未处理则重试
            if (delay > 30000) {
                jedis.lrem(CONFIRM_QUEUE_KEY, 1, message);
                jedis.lpush(QUEUE_KEY, message);
                System.out.println("消息超时重试:" + message);
            }
        }
    }
}
3.3 性能优化策略
// 生产者批量发送
jedis.lpush(QUEUE_KEY, "msg1", "msg2", "msg3");
// 消费者批量获取(非阻塞)
List<String> batch = jedis.rpop(QUEUE_KEY, 10); // 获取最多10条

管道(Pipeline)优化

try (Pipeline p = jedis.pipelined()) {
    p.lpush(QUEUE_KEY, "msg1");
    p.lpush(QUEUE_KEY, "msg2");
    p.sync(); // 批量提交
}

监控指标

队列长度监控:LLEN key

消费者积压:比较生产和消费速率

异常告警:死信队列增长监控

4. 适用场景分析

4.1 推荐使用场景
4.2 不适用场景
  1. 严格顺序要求:List虽然有序,但在多消费者场景下不能保证全局顺序
  2. 广播模式需求:需要所有消费者收到相同消息
  3. 持久化要求高:Redis是内存数据库,虽然支持持久化但不保证100%可靠
  4. 复杂路由需求:需要根据消息内容路由到不同队列

5. 生产环境建议

# 监控队列长度
redis-cli llen redis:mq:list:order

# 监控Redis内存
redis-cli info memory

示例:payment:mq:list:refund

方案二、基于 Pub/Sub 的广播式消息队列方案详解

Redis Pub/Sub 模型介绍

Redis 的 Pub/Sub(发布 - 订阅)模型是一种高效的"一对多"消息通信机制,它允许生产者将消息发布到特定的频道(Channel),而所有订阅该频道的消费者都能即时接收到这些消息。这种模式特别适合需要实时广播的场景,如新闻推送、实时聊天系统等。

核心命令及功能详解

角色核心命令作用说明
生产者PUBLISH channel message向指定频道发布消息,返回接收消息的消费者数量
消费者SUBSCRIBE channel1 channel2订阅一个或多个频道,阻塞等待消息(订阅状态下只能接收消息,无法执行其他命令)
消费者PSUBSCRIBE pattern使用模式匹配订阅频道(如PSUBSCRIBE redis:mq:pubsub:*订阅所有匹配前缀的频道)

2.1 代码实战(Java + Jedis)

生产者实现(发布消息)
import redis.clients.jedis.Jedis;

public class PubSubProducer {
    // 定义频道名称,采用命名空间方式避免冲突
    private static final String CHANNEL_KEY = "redis:mq:pubsub:news"; 
    // 创建Redis连接实例
    private static final Jedis jedis = new Jedis("localhost", 6379);

    public static void main(String[] args) throws InterruptedException {
        // 模拟发布3条新闻消息,实际应用中可接入实时数据源
        String[] news = {
            "Redis 7.2版本发布,新增Stream增强功能",
            "基于Redis的消息队列在电商场景的实践",
            "Redis Cluster集群部署最佳实践"
        };

        // 循环发布消息
        for (String msg : news) {
            // 发布消息并获取接收者数量
            long receiverCount = jedis.publish(CHANNEL_KEY, msg);
            System.out.println(String.format(
                "【生产者】发布消息:%s,当前订阅者数量:%d", 
                msg, receiverCount));
            // 模拟消息间隔
            Thread.sleep(1000);
        }
        
        // 关闭连接
        jedis.close();
    }
}
消费者实现(订阅消息)
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

public class PubSubConsumer {
    private static final String CHANNEL_KEY = "redis:mq:pubsub:news";
    private static final Jedis jedis = new Jedis("localhost", 6379);

    public static void main(String[] args) {
        // 创建自定义的消息处理器
        JedisPubSub pubSub = new JedisPubSub() {
            // 接收到消息时的回调方法
            @Override
            public void onMessage(String channel, String message) {
                System.out.println(String.format(
                    "【消费者1】接收到新消息(频道:%s):%s", 
                    channel, message));
                // 此处可添加业务处理逻辑
                // 例如:解析消息内容、写入数据库、触发其他操作等
            }

            // 成功订阅频道时的回调
            @Override
            public void onSubscribe(String channel, int subscribedChannels) {
                System.out.println(String.format(
                    "【消费者1】成功订阅频道:%s,当前订阅总数:%d", 
                    channel, subscribedChannels));
            }
            
            // 可添加其他回调方法如onUnsubscribe、onPSubscribe等
        };

        System.out.println("【消费者1】启动并开始监听...");
        // 开始订阅(该方法会阻塞当前线程)
        jedis.subscribe(pubSub, CHANNEL_KEY);
        
        // 注意:在实际应用中,通常会将订阅逻辑放在独立线程中
        // 以避免阻塞主线程
    }
}

2.2 方案深度分析与应用场景

优点详解
  1. 实时广播能力:天然支持一对多的消息分发,一条消息可以同时被多个消费者接收
  2. 实现简单:无需额外中间件,使用Redis原生命令即可实现
  3. 低延迟:消息发布后立即推送给所有订阅者,延迟通常在毫秒级
  4. 动态扩展:消费者可以随时加入或退出订阅,系统自动处理连接管理
缺点与限制
适用场景分析
不适用场景
  1. 金融交易等要求消息100%可靠的系统
  2. 需要保证消息顺序的场景
  3. 需要消息重放或回溯的业务
  4. 消费者处理能力远低于生产者速率的场景

使用建议

方案 3:基于 Stream 的可靠消息队列(Redis 5.0+)

Redis 5.0 推出的 Stream 数据结构是专门为消息队列场景设计的,它完美解决了传统 List 和 Pub/Sub 模式的诸多缺陷。Stream 支持消息持久化存储、消息确认机制、消费者组管理、死信队列等企业级特性,是目前 Redis 实现可靠消息队列的最佳方案。在实际应用中,如电商订单处理、支付流水记录、日志收集等场景都能发挥重要作用。

3.1 Stream 核心概念

Stream:消息队列的主体,每个 Stream 有唯一的 key(如"order:stream")。消息以"条目(Entry)"形式存储,每个条目包含:

消费者组(Consumer Group):通过将多个消费者归为一组,实现:

消息确认(ACK)机制

  1. 消费者获取消息后,消息进入"Pending"状态
  2. 处理完成后需显式发送ACK命令
  3. 未确认的消息会在消费者断开后重新分配

Pending 列表

死信队列

3.2 核心命令详解

基本操作命令
操作类型命令格式说明
添加消息XADD key * field1 value1 [field2 value2...]*表示自动生成ID,可指定ID保证顺序
创建消费者组XGROUP CREATE key groupname id [MKSTREAM]MKSTREAM选项在Stream不存在时自动创建
消费消息XREADGROUP GROUP group consumer [COUNT n] [BLOCK ms] STREAMS key [id]id通常为>表示新消息,0表示Pending消息
消息确认XACK key groupname id [id...]支持批量确认多个消息
查看Pending消息XPENDING key groupname [start end count] [consumer]可查看指定消费者的未确认消息
消息所有权转移XCLAIM key groupname consumer min-idle-time id [id...]将空闲超时的消息转给其他消费者处理
高级管理命令
  1. 消息回溯XREAD STREAMS key 0-0从最早消息开始读取
  2. 范围查询XRANGE key start end [COUNT n]按ID范围查询
  3. 监控命令XINFO GROUPS key查看消费者组信息

3.3 代码实战(Java + Jedis)

1. 环境准备
// Maven依赖
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>4.3.1</version>
</dependency>
// 连接配置
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(10);
try (JedisPool pool = new JedisPool(config, "localhost", 6379)) {
    Jedis jedis = pool.getResource();
    // 业务代码...
}
2. 生产消费完整流程

生产者增强版

public class EnhancedProducer {
    private static final String[] ORDER_STATUS = {"PENDING", "PAID", "SHIPPED", "COMPLETED"};
    public void sendOrderEvent(Order order) {
        try (Jedis jedis = pool.getResource()) {
            Map<String, String> fields = new HashMap<>();
            fields.put("order_id", order.getId());
            fields.put("user_id", order.getUserId());
            fields.put("amount", order.getAmount().toString());
            fields.put("status", ORDER_STATUS[0]);
            fields.put("create_time", Instant.now().toString());
            // 使用事务保证原子性
            Transaction t = jedis.multi();
            t.xadd(STREAM_KEY, StreamEntryID.NEW_ENTRY, fields);
            t.sadd("order:ids", order.getId()); // 记录订单ID集合
            t.exec();
            // 添加监控埋点
            Metrics.counter("mq.produce.count").increment();
        }
    }
}

消费者增强版

public class ReliableConsumer implements Runnable {
    private static final int MAX_RETRY = 3;
    @Override
    public void run() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                Map<String, List<StreamEntry>> messages = jedis.xreadGroup(
                    GROUP_NAME, consumerName,
                    new StreamParams().count(1).block(2000),
                    new StreamOffset(STREAM_KEY, ">")
                );
                if (messages != null) {
                    messages.forEach((stream, entries) -> {
                        entries.forEach(entry -> {
                            processWithRetry(entry);
                        });
                    });
                }
            } catch (Exception e) {
                logger.error("消费异常", e);
                sleep(1000);
            }
        }
    }
    private void processWithRetry(StreamEntry entry) {
        int retryCount = getRetryCount(entry.getID());
        if (retryCount >= MAX_RETRY) {
            moveToDeadLetter(entry);
            return;
        }
        try {
            Order order = parseOrder(entry.getFields());
            orderService.process(order);
            jedis.xack(STREAM_KEY, GROUP_NAME, entry.getID());
        } catch (Exception e) {
            logger.warn("处理失败准备重试", e);
            sleep(1000 * (retryCount + 1));
        }
    }
}
3. 死信队列管理
public class DeadLetterMonitor {
    public void checkPendingMessages() {
        // 获取所有超时未确认的消息
        List<StreamEntry> pending = getPendingMessages(TIMEOUT_MS);
        pending.forEach(entry -> {
            // 检查重试次数
            if (getRetryCount(entry.getID()) > MAX_RETRY) {
                // 转移到死信队列
                jedis.xadd(DEAD_STREAM_KEY, StreamEntryID.NEW_ENTRY, entry.getFields());
                jedis.xack(STREAM_KEY, GROUP_NAME, entry.getID());
                logger.warn("消息转入死信队列: {}", entry.getID());
                // 发送告警通知
                alertService.notifyAdmin(entry);
            }
        });
    }
    public void reprocessDeadLetters() {
        // 从死信队列重新处理
        List<StreamEntry> deadMessages = jedis.xrange(DEAD_STREAM_KEY, "-", "+");
        deadMessages.forEach(entry -> {
            try {
                manualProcess(entry.getFields());
                jedis.xdel(DEAD_STREAM_KEY, entry.getID());
            } catch (Exception e) {
                logger.error("死信处理失败", e);
            }
        });
    }
}

3.4 最佳实践建议

// 批量消费提高吞吐量
jedis.xreadGroup(GROUP_NAME, consumerName, 
    new StreamParams().count(100).block(1000),
    new StreamOffset(STREAM_KEY, ">"));

// 批量确认减少网络开销
jedis.xack(STREAM_KEY, GROUP_NAME, id1, id2, id3);
// 消费者崩溃后的恢复处理
public void recoverConsumer(String failedConsumer) {
    List<PendingEntry> pendings = jedis.xpending(
        STREAM_KEY, GROUP_NAME, "-", "+", 100, failedConsumer);
    pendings.forEach(pending -> {
        jedis.xclaim(STREAM_KEY, GROUP_NAME, currentConsumer, 
            TIMEOUT_MS, pending.getIdAsString());
    });
}

通过以上实现,基于Redis Stream的消息队列可以达到:

三、三种方案的选型对比与最佳实践

3.1 方案选型对比表:

对比维度List 方案Pub/Sub 方案Stream 方案(推荐)
消息持久化支持(需手动处理)不支持原生支持
消息确认需自定义(如RPOPLPUSH)不支持原生支持(ACK机制)
广播能力不支持原生支持(全量广播)支持(通过多消费者组实现)
消费者负载均衡支持(竞争消费模式)不支持(全量推送)支持(消费者组内自动均衡)
死信队列需自定义(备份List)不支持支持(通过XCLAIM命令)
实现复杂度低(基础命令即可)低(订阅/发布模式)中(需理解消费者组概念)
内存占用线性增长瞬时内存可控制(支持消息修剪)
历史消息回溯有限支持(需保存完整List)不支持完整支持(消息ID时间序列)
适用场景简单异步通信实时广播通知可靠消息、企业级场景

3.2 最佳实践建议

  1. 选型决策树:

    • 首要判断消息可靠性需求:
      • 必须保证不丢失 → 直接选择Stream
      • 可接受偶尔丢失 → 进入下一判断
    • 次要判断消息分发模式:
      • 需要广播 → 选择Pub/Sub
      • 点对点消费 → 选择List或Stream
    • 最后评估开发成本:
      • 快速实现 → 选择List
      • 长期维护 → 选择Stream
  2. Stream方案实施细节:

    • 消费者组创建示例:
      XGROUP CREATE mystream mygroup $ MKSTREAM
      
    • 典型消费代码逻辑:
      1. 使用XREADGROUP阻塞读取
      2. 业务处理成功后发送XACK
      3. 处理失败时使用XCLAIM转移消息
      4. 设置合理的PEL(Pending Entries List)超时
  3. List方案优化建议:

    • 可靠消费模式实现:
      RPOPLPUSH source_list backup_list  # 原子操作
      # 处理成功后再LREM备份列表
    • 性能提升技巧:
      • 批量生产:使用Pipeline打包多个LPUSH
      • 批量消费:LUA脚本实现多消息批量获取
  4. 集群环境特别注意事项:

    • 跨slot访问问题:
      • 所有相关key必须使用相同hash tag(如{msg})
      • 或者采用客户端分片路由
    • 监控重点指标:
      • Stream方案的PEL积压长度
      • List方案的内存增长曲线
      • Pub/Sub的客户端连接数
  5. 运维管理建议:

    • 容量规划:
      • 按业务峰值QPS的1.5倍预留资源
      • Stream建议单分片不超过10MB/s写入
    • 监控告警:
      • 设置消息积压阈值(如Stream的PEL>1000)
      • 监控消费者延迟(XINFO GROUPS)
    • 灾备方案:
      • 定期备份Stream的RDB快照
      • 对于关键业务实现双写机制

四、实际应用案例:电商订单异步处理

4.1 业务流程详解

电商平台的订单处理采用异步消息队列模式,通过Redis Stream实现可靠的消息传递和消费。整个流程包含以下关键环节:

  1. 订单创建阶段

    • 用户下单后,订单服务作为生产者将订单数据持久化到MySQL数据库
    • 同时将订单关键信息(订单ID、用户ID、商品ID、数量等)封装为消息,发送到名为"order_create"的Stream中
    • 消息格式示例:
      {
        "order_id": "ORD20231125001",
        "user_id": "U10086",
        "product_id": "P8808",
        "quantity": "2"
      }
  2. 并行消费阶段

    • 通知服务(消费者1):专门处理用户通知

      • 消费消息后调用短信平台API或极光推送服务
      • 通知内容示例:"尊敬的会员,您的订单ORD20231125001已创建成功,我们将尽快为您处理"
      • 支持重试机制:若首次发送失败,会按照指数退避策略重试3次
    • 库存服务(消费者2):负责库存扣减

      • 采用乐观锁机制更新库存:UPDATE inventory SET stock = stock - ? WHERE product_id = ? AND stock >= ?
      • 实现分布式事务:若扣减失败会记录操作日志,便于后续人工核对
  3. 异常处理机制

    • 当库存扣减失败时,消息会进入Pending列表并设置5分钟超时
    • 超时后自动转移到死信队列"DLQ:order_create"
    • 运维人员通过管理后台查看死信队列,可:
      • 人工补扣库存
      • 触发订单取消流程
      • 联系用户协商处理

4.2 核心代码实现(生产级优化版)

订单服务(生产者)增强实现

// 订单服务(生产者)发送消息 - 增强版
public void createOrder(Order order) {
    // 1. 数据库事务确保数据一致性
    TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition());
    try {
        // 1.1 保存主订单
        orderMapper.insert(order);
        // 1.2 保存订单明细
        order.getItems().forEach(item -> {
            item.setOrderId(order.getId());
            orderItemMapper.insert(item);
        });
        // 2. 构建消息体(添加时间戳和业务标识)
        Map<String, String> message = new HashMap<>();
        message.put("order_id", order.getId());
        message.put("user_id", order.getUserId());
        message.put("product_id", order.getProductId()); 
        message.put("quantity", order.getQuantity() + "");
        message.put("create_time", System.currentTimeMillis() + "");
        message.put("biz_type", "NORMAL_ORDER");
        // 3. 发送消息(添加重试机制)
        int retryTimes = 0;
        while (retryTimes < 3) {
            try {
                jedis.xadd("redis:mq:stream:order_create", null, message);
                break;
            } catch (Exception e) {
                retryTimes++;
                if (retryTimes == 3) {
                    throw new RuntimeException("消息发送失败", e);
                }
                Thread.sleep(1000 * retryTimes);
            }
        }
        transactionManager.commit(status);
    } catch (Exception e) {
        transactionManager.rollback(status);
        throw new BusinessException("订单创建失败", e);
    }
}

通知服务(消费者)完整实现

// 通知服务(消费者)完整实现
public void handleNotification() {
    // 初始化消费者组(幂等操作)
    initConsumerGroup("redis:mq:stream:order_create", "order_group");
    while (!Thread.currentThread().isInterrupted()) {
        try {
            Map<String, List<StreamEntry>> messages = jedis.xreadGroup(
                "order_group", 
                "notify_consumer_" + instanceId, // 使用实例ID区分消费者
                1, 
                5000, 
                false,
                Map.of("redis:mq:stream:order_create", StreamEntryID.UNRECEIVED_ENTRY)
            );
            if (messages != null && !messages.isEmpty()) {
                for (StreamEntry entry : messages.get("redis:mq:stream:order_create")) {
                    Map<String, String> content = entry.getFields();
                    String userId = content.get("user_id");
                    String orderId = content.get("order_id");
                    // 1. 发送短信(带熔断机制)
                    boolean smsSent = circuitBreaker.execute(() -> 
                        smsService.send(userId, "订单通知", "您的订单" + orderId + "已创建成功"));
                    // 2. 发送APP推送
                    boolean pushSent = pushService.send(userId, "订单创建通知", 
                        Map.of("orderId", orderId, "type", "order_created"));
                    if (smsSent || pushSent) {
                        // 至少一个通知发送成功才确认消息
                        jedis.xack("redis:mq:stream:order_create", "order_group", entry.getID());
                        monitorService.recordSuccess("order_notify");
                    } else {
                        monitorService.recordFailure("order_notify");
                    }
                }
            }
        } catch (Exception e) {
            log.error("通知处理异常", e);
            monitorService.recordError("order_notify", e);
            Thread.sleep(5000); // 异常休眠避免循环异常
        }
    }
}
private void initConsumerGroup(String streamKey, String groupName) {
    try {
        jedis.xgroupCreate(streamKey, groupName, StreamEntryID.LAST_ENTRY, true);
    } catch (RedisBusyException e) {
        log.info("消费者组已存在: {}", groupName);
    }
}

库存服务(消费者)完整实现

// 库存服务(消费者)完整实现 
public void handleInventory() {
    // 初始化消费者组
    initConsumerGroup("redis:mq:stream:order_create", "order_group");
    while (!Thread.currentThread().isInterrupted()) {
        try {
            Map<String, List<StreamEntry>> messages = jedis.xreadGroup(
                "order_group",
                "inventory_consumer_" + instanceId,
                1,
                5000,
                false,
                Map.of("redis:mq:stream:order_create", StreamEntryID.UNRECEIVED_ENTRY)
            );
            if (messages != null && !messages.isEmpty()) {
                for (StreamEntry entry : messages.get("redis:mq:stream:order_create")) {
                    Map<String, String> content = entry.getFields();
                    String productId = content.get("product_id");
                    int quantity = Integer.parseInt(content.get("quantity"));
                    String orderId = content.get("order_id");
                    // 1. 扣减库存(带事务)
                    boolean success = inventoryService.deductWithLog(
                        productId, 
                        quantity,
                        orderId,
                        "ORDER_DEDUCTION"
                    );
                    if (success) {
                        // 2. 确认消息
                        jedis.xack("redis:mq:stream:order_create", "order_group", entry.getID());
                        monitorService.recordSuccess("inventory_deduct");
                    } else {
                        // 3. 记录失败日志
                        log.warn("库存扣减失败 orderId={}, productId={}", orderId, productId);
                        monitorService.recordFailure("inventory_deduct");
                        // 不确认消息,让其进入Pending状态
                    }
                }
            }
        } catch (Exception e) {
            log.error("库存处理异常", e);
            monitorService.recordError("inventory_deduct", e);
            Thread.sleep(5000);
        }
    }
}
// 库存扣减服务方法
@Transactional
public boolean deductWithLog(String productId, int quantity, String bizId, String bizType) {
    // 1. 扣减库存
    int affected = inventoryMapper.deductWithVersion(
        productId, 
        quantity,
        getCurrentVersion(productId)
    );
    if (affected == 0) {
        return false;
    }
    // 2. 记录操作流水
    InventoryLog log = new InventoryLog();
    log.setLogId(UUID.randomUUID().toString());
    log.setProductId(productId);
    log.setChangedAmount(-quantity);
    log.setBizId(bizId);
    log.setBizType(bizType);
    log.setRemarks("订单扣减");
    inventoryLogMapper.insert(log);
    return true;
}

4.3 监控与运维设计

  1. 监控指标

    • 消息堆积量:XLEN redis:mq:stream:order_create
    • Pending列表数量:XPENDING redis:mq:stream:order_create order_group
    • 消费者延迟:通过消息时间戳与当前时间差值计算
  2. 运维命令示例

    # 查看消费者组信息
    XINFO GROUPS redis:mq:stream:order_create
    # 处理死信消息
    XRANGE DLQ:order_create - + COUNT 10
    XACK DLQ:order_create manual_group <entry_id>
  3. 自动恢复方案

    • 定时任务每小时检查Pending列表
    • 对于超时1小时未处理的消息:
      • 尝试重新投递到原Stream
      • 超过3次重试则转入死信队列
      • 触发企业微信告警通知运维人员

到此这篇关于Redis 是如何实现消息队列的?的文章就介绍到这了,更多相关Redis消息队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文