Redis

关注公众号 jb51net

关闭
首页 > 数据库 > Redis > redis stream 消息队列

Redis从基础到Stream消息队列实战指南

作者:用什么都重名

Redis是一个高性能内存数据库,支持字符串、哈希、列表等数据结构,5.0版本新增Stream类型,提供消息队列功能,支持消费者组和消息确认,本文介绍Redis从基础到Stream消息队列实战指南,感兴趣的朋友跟随小编一起看看吧

一、Redis 简介

Redis(Remote Dictionary Server)是一个基于内存的键值存储系统,支持多种数据结构,常被用作缓存、消息队列和会话存储。它提供高性能读写,并支持持久化、主从复制和集群。

二、Redis 核心特点

特点说明
内存存储数据主要在内存中,读写延迟低(通常微秒级)
持久化支持 RDB 快照和 AOF 日志,保证数据不丢失
多种数据结构String、List、Hash、Set、Sorted Set、Stream、Bitmap 等
单线程模型命令串行执行,避免锁竞争,保证原子性
主从复制支持读写分离和高可用
发布订阅支持 Pub/Sub 和 Stream 消息队列
事务支持 MULTI/EXEC 事务
Lua 脚本支持 Lua 脚本,保证原子性
集群支持 Redis Cluster 水平扩展

三、Redis 常用数据结构

3.1 String(字符串)

SET key value          # 设置
GET key                # 获取
INCR key               # 自增
EXPIRE key seconds     # 设置过期时间

3.2 Hash(哈希)

HSET user:1 name "张三" age 25
HGET user:1 name
HGETALL user:1

3.3 List(列表)

LPUSH queue task1      # 左侧入队
RPOP queue             # 右侧出队
LRANGE queue 0 -1      # 范围查询

3.4 Set(集合)

SADD tags "redis" "cache"
SMEMBERS tags
SISMEMBER tags "redis"

3.5 Sorted Set(有序集合)

ZADD rank 100 "user1" 95 "user2"
ZRANGE rank 0 -1 WITHSCORES
ZREVRANK rank "user1"

四、Redis Stream:消息队列

Redis 5.0 引入 Stream,用于实现消息队列,支持:

4.1 基本命令

# 添加消息
XADD mystream * field1 value1 field2 value2
# 读取消息(从头)
XREAD COUNT 10 STREAMS mystream 0
# 读取最新消息(阻塞)
XREAD BLOCK 5000 STREAMS mystream $

4.2 消费者组

# 创建消费者组
XGROUP CREATE mystream mygroup 0 MKSTREAM
# 消费消息
XREADGROUP GROUP mygroup consumer1 COUNT 1 STREAMS mystream >
# 确认消息
XACK mystream mygroup message-id
# 查看待确认消息
XPENDING mystream mygroup

4.3 Stream 与 List 的对比

特性ListStream
消息持久化
消费者组
消息确认
历史回溯有限支持
适用场景简单队列可靠消息队列

五、Redis 在项目中的典型用法

以银行场景下的 SOP 合规检测为例,Redis Stream 用于任务分发和结果回传。

5.1 架构示意

后端/前端 → XADD 推送任务 → Redis Stream
                              ↓
                    sop_engine 监听 XREADGROUP
                              ↓
                    处理完成后 XADD 推送结果
                              ↓
                    后端消费 message:xxx:results

5.2 MAF 营销分析任务流

# 1. 推送 MAF 任务
XADD message:maf:tasks * task_id "maf_001" \
  conversation_id "maf_001" \
  conversation_json_path "bucket/path/conversation.json"
# 2. 引擎监听 message:maf:tasks,消费并分析
# 3. 引擎完成后写入结果流
XADD message:maf:results * task_id "maf_001" status "completed" \
  result_path "maf-results-bucket/maf_001/analysis_result.json"

5.3 消费者组带来的好处

5.4 Redis Stream 请求分发规则

如果两个项目使用了同一套Redis配置,那么无法保证Redis Stream 请求会由谁处理,取决于 Redis 消费者组的分发。

两个容器都连同一个 Redis,监听同一个 Stream(如 message:maf:tasks),且通常在同一消费者组(如 maf_analyze_group)里:

因此:

可能带来的问题:

场景说明
代码版本混用同一批任务,部分由旧代码处理,部分由新代码处理
测试不可控无法保证测试请求一定打到新容器
任务重复若消费者组配置不当,可能出现同一条消息被多个消费者处理

如何保证只使用新代码:

方案 1:停掉旧容器

# 停掉旧容器后再部署新容器
docker stop <old_container_id>
docker run ...  # 启动新容器

方案 2:用不同的 Stream 做测试

方案 3:用不同的消费者组

方案 4:只保留一个容器

六、Redis 常用配置

# 绑定地址
bind 0.0.0.0
# 端口
port 6379
# 密码
requirepass your_password
# 最大内存
maxmemory 2gb
maxmemory-policy allkeys-lru
# 持久化
save 900 1
save 300 10
save 60 10000
appendonly yes

七、Python 操作 Redis

7.1 Linux安装

# Ubuntu/Debian
sudo apt update
sudo apt install redis-server
# CentOS/RHEL
sudo yum install redis
# 启动服务
sudo systemctl start redis
sudo systemctl enable redis
# 验证
redis-cli ping
# 返回 PONG 表示成功

7.2 Docker安装

docker run -d --name redis -p 6379:6379 redis:latest
# 带密码
docker run -d --name redis -p 6379:6379 redis redis-server --requirepass yourpassword

7.3 连接Redis

# 本地连接
redis-cli
# 带密码连接
redis-cli -a yourpassword
# 远程连接
redis-cli -h host -p 6379 -a password

7.4 基本使用

import redis
r = redis.Redis(host='localhost', port=6379, db=0, password='')
# String
r.set('name', 'Redis')
print(r.get('name'))
# Stream 添加消息
r.xadd('mystream', {'task_id': '001', 'data': 'hello'})
# Stream 消费
messages = r.xreadgroup('mygroup', 'consumer1', {'mystream': '>'}, count=1)
for stream, msgs in messages:
    for msg_id, data in msgs:
        print(msg_id, data)
        r.xack('mystream', 'mygroup', msg_id)

八、Redis 使用建议

  1. 合理设置 maxmemory 和淘汰策略,避免 OOM。
  2. 生产环境开启 requirepass 和访问控制。
  3. 使用连接池,减少连接开销。
  4. 对热点 key 做拆分或本地缓存,减轻压力。
  5. 使用 Pipeline 批量执行命令,降低网络往返。
  6. Stream 场景下注意消费者组和 ACK,避免消息堆积和重复消费。

总结

Redis 适合做缓存、会话存储和消息队列。Stream 在需要可靠消费、负载均衡和消息确认的场景中,比简单 List 更合适。结合具体业务(如 MAF 任务流),可以设计出清晰、可扩展的异步处理架构。

到此这篇关于Redis从基础到Stream消息队列实战指南的文章就介绍到这了,更多相关redis stream 消息队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文