Redis从基础到Stream消息队列实战指南
作者:用什么都重名
一、Redis 简介
Redis(Remote Dictionary Server)是一个基于内存的键值存储系统,支持多种数据结构,常被用作缓存、消息队列和会话存储。它提供高性能读写,并支持持久化、主从复制和集群。
二、Redis 核心特点
| 特点 | 说明 |
|---|---|
| 内存存储 | 数据主要在内存中,读写延迟低(通常微秒级) |
| 持久化 | 支持 RDB 快照和 AOF 日志,保证数据不丢失 |
| 多种数据结构 | String、List、Hash、Set、Sorted Set、Stream、Bitmap 等 |
| 单线程模型 | 命令串行执行,避免锁竞争,保证原子性 |
| 主从复制 | 支持读写分离和高可用 |
| 发布订阅 | 支持 Pub/Sub 和 Stream 消息队列 |
| 事务 | 支持 MULTI/EXEC 事务 |
| Lua 脚本 | 支持 Lua 脚本,保证原子性 |
| 集群 | 支持 Redis Cluster 水平扩展 |
三、Redis 常用数据结构
3.1 String(字符串)
SET key value # 设置 GET key # 获取 INCR key # 自增 EXPIRE key seconds # 设置过期时间
3.2 Hash(哈希)
HSET user:1 name "张三" age 25 HGET user:1 name HGETALL user:1
3.3 List(列表)
LPUSH queue task1 # 左侧入队 RPOP queue # 右侧出队 LRANGE queue 0 -1 # 范围查询
3.4 Set(集合)
SADD tags "redis" "cache" SMEMBERS tags SISMEMBER tags "redis"
3.5 Sorted Set(有序集合)
ZADD rank 100 "user1" 95 "user2" ZRANGE rank 0 -1 WITHSCORES ZREVRANK rank "user1"
四、Redis Stream:消息队列
Redis 5.0 引入 Stream,用于实现消息队列,支持:
- 消息持久化
- 消费者组
- 消息确认(ACK)
- 历史消息回溯
4.1 基本命令
# 添加消息 XADD mystream * field1 value1 field2 value2 # 读取消息(从头) XREAD COUNT 10 STREAMS mystream 0 # 读取最新消息(阻塞) XREAD BLOCK 5000 STREAMS mystream $
4.2 消费者组
# 创建消费者组 XGROUP CREATE mystream mygroup 0 MKSTREAM # 消费消息 XREADGROUP GROUP mygroup consumer1 COUNT 1 STREAMS mystream > # 确认消息 XACK mystream mygroup message-id # 查看待确认消息 XPENDING mystream mygroup
4.3 Stream 与 List 的对比
| 特性 | List | Stream |
|---|---|---|
| 消息持久化 | 有 | 有 |
| 消费者组 | 无 | 有 |
| 消息确认 | 无 | 有 |
| 历史回溯 | 有限 | 支持 |
| 适用场景 | 简单队列 | 可靠消息队列 |
五、Redis 在项目中的典型用法
以银行场景下的 SOP 合规检测为例,Redis Stream 用于任务分发和结果回传。
5.1 架构示意
后端/前端 → XADD 推送任务 → Redis Stream
↓
sop_engine 监听 XREADGROUP
↓
处理完成后 XADD 推送结果
↓
后端消费 message:xxx:results
5.2 MAF 营销分析任务流
# 1. 推送 MAF 任务 XADD message:maf:tasks * task_id "maf_001" \ conversation_id "maf_001" \ conversation_json_path "bucket/path/conversation.json" # 2. 引擎监听 message:maf:tasks,消费并分析 # 3. 引擎完成后写入结果流 XADD message:maf:results * task_id "maf_001" status "completed" \ result_path "maf-results-bucket/maf_001/analysis_result.json"
5.3 消费者组带来的好处
- 多个 worker 共同消费,实现负载均衡
- 每条消息只被组内一个消费者处理
- 支持 ACK,失败可重试
- 支持 PEL(Pending Entries List)查看未确认消息
5.4 Redis Stream 请求分发规则
如果两个项目使用了同一套Redis配置,那么无法保证Redis Stream 请求会由谁处理,取决于 Redis 消费者组的分发。
两个容器都连同一个 Redis,监听同一个 Stream(如 message:maf:tasks),且通常在同一消费者组(如 maf_analyze_group)里:
- Redis 会把消息分发给组内的消费者
- 每条消息只会被组内一个消费者处理
- 具体是容器 1 还是容器 2,由 Redis 的分发策略决定,无法指定
因此:
- 有时是A容器处理
- 有时是B容器处理
- 测试时无法稳定地“只让某个容器”处理请求
可能带来的问题:
| 场景 | 说明 |
|---|---|
| 代码版本混用 | 同一批任务,部分由旧代码处理,部分由新代码处理 |
| 测试不可控 | 无法保证测试请求一定打到新容器 |
| 任务重复 | 若消费者组配置不当,可能出现同一条消息被多个消费者处理 |
如何保证只使用新代码:
方案 1:停掉旧容器
# 停掉旧容器后再部署新容器 docker stop <old_container_id> docker run ... # 启动新容器
方案 2:用不同的 Stream 做测试
- 新容器监听不同的 Stream,例如 message:maf:tasks_test
- 测试时往 message:maf:tasks_test 发消息
- 需要改配置或环境变量,让新容器使用 message:maf:tasks_test
方案 3:用不同的消费者组
- 新容器使用不同的消费者组名,例如 maf_analyze_group_v2
- 两个组都会收到同一条消息,各自处理一次
- 适合做 A/B 或灰度,但会产生重复处理,需要业务上能接受
方案 4:只保留一个容器
- 部署新容器前先停掉旧容器
- 或使用滚动更新,保证同一时间只有一个版本在跑
六、Redis 常用配置
# 绑定地址 bind 0.0.0.0 # 端口 port 6379 # 密码 requirepass your_password # 最大内存 maxmemory 2gb maxmemory-policy allkeys-lru # 持久化 save 900 1 save 300 10 save 60 10000 appendonly yes
七、Python 操作 Redis
7.1 Linux安装
# Ubuntu/Debian sudo apt update sudo apt install redis-server # CentOS/RHEL sudo yum install redis # 启动服务 sudo systemctl start redis sudo systemctl enable redis # 验证 redis-cli ping # 返回 PONG 表示成功
7.2 Docker安装
docker run -d --name redis -p 6379:6379 redis:latest # 带密码 docker run -d --name redis -p 6379:6379 redis redis-server --requirepass yourpassword
7.3 连接Redis
# 本地连接 redis-cli # 带密码连接 redis-cli -a yourpassword # 远程连接 redis-cli -h host -p 6379 -a password
7.4 基本使用
import redis
r = redis.Redis(host='localhost', port=6379, db=0, password='')
# String
r.set('name', 'Redis')
print(r.get('name'))
# Stream 添加消息
r.xadd('mystream', {'task_id': '001', 'data': 'hello'})
# Stream 消费
messages = r.xreadgroup('mygroup', 'consumer1', {'mystream': '>'}, count=1)
for stream, msgs in messages:
for msg_id, data in msgs:
print(msg_id, data)
r.xack('mystream', 'mygroup', msg_id)八、Redis 使用建议
- 合理设置 maxmemory 和淘汰策略,避免 OOM。
- 生产环境开启 requirepass 和访问控制。
- 使用连接池,减少连接开销。
- 对热点 key 做拆分或本地缓存,减轻压力。
- 使用 Pipeline 批量执行命令,降低网络往返。
- Stream 场景下注意消费者组和 ACK,避免消息堆积和重复消费。
总结
Redis 适合做缓存、会话存储和消息队列。Stream 在需要可靠消费、负载均衡和消息确认的场景中,比简单 List 更合适。结合具体业务(如 MAF 任务流),可以设计出清晰、可扩展的异步处理架构。
到此这篇关于Redis从基础到Stream消息队列实战指南的文章就介绍到这了,更多相关redis stream 消息队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
