Django中使用django-redis库与Redis交互API指南
作者:Python游侠
这篇文章主要介绍了Django缓存与原生Redis的区别,以及如何在Django中使用Redis的各种数据类型进行缓存,文章还提供了一些实战场景和性能优化技巧,帮助开发者充分利用Redis的强大功能来构建高性能的应用,需要的朋友可以参考下
一、理解Django缓存与原生Redis的区别
| Django缓存API | Redis原生数据类型 | 用途 |
|---|---|---|
| 键值对存储 | 字符串(String) | 简单缓存 |
| 不支持 | 列表(List) | 消息队列、最新列表 |
| 不支持 | 集合(Set) | 去重、共同好友 |
| 不支持 | 有序集合(Sorted Set) | 排行榜、优先级队列 |
| 不支持 | 哈希(Hash) | 对象存储、多个字段 |
二、获取原生Redis连接
要在Django中使用Redis的所有数据类型,需要获取原生Redis连接:
# 方法1:通过django-redis获取连接
from django_redis import get_redis_connection
# 获取默认缓存对应的Redis连接
redis_conn = get_redis_connection("default")
# 获取特定缓存的连接
session_conn = get_redis_connection("session")
# 方法2:直接创建连接(不推荐,缺少连接池管理)
import redis
redis_client = redis.Redis(
host='localhost',
port=6379,
db=0,
password=None
)
三、字符串类型(String)操作
虽然Django缓存API支持字符串,但Redis有更多功能:
1. 带过期时间的操作
def string_operations():
"""Redis字符串操作"""
redis_conn = get_redis_connection("default")
# 1. 设置带过期时间的键
redis_conn.setex("session:user:123", 3600, "session_data") # 1小时后过期
# 2. 设置多个键
redis_conn.mset({"key1": "value1", "key2": "value2"})
# 3. 获取字符串的一部分
redis_conn.set("message", "Hello World")
part = redis_conn.getrange("message", 0, 4) # "Hello"
# 4. 追加字符串
redis_conn.append("message", " Redis!")
# 5. 获取字符串长度
length = redis_conn.strlen("message")
# 6. 设置键的值并返回旧值
old_value = redis_conn.getset("counter", "100")
# 7. 位操作
redis_conn.setbit("user:online:2023-10-01", 123, 1) # 用户123在线
redis_conn.getbit("user:online:2023-10-01", 123) # 检查是否在线
return {
"part": part,
"length": length,
"old_value": old_value
}
四、列表类型(List)操作
1. 基本列表操作
def list_operations():
"""Redis列表操作"""
redis_conn = get_redis_connection("default")
# 1. 左侧推入元素
redis_conn.lpush("recent_users", "user_123", "user_456", "user_789")
# 2. 右侧推入元素
redis_conn.rpush("task_queue", "task1", "task2", "task3")
# 3. 左侧弹出元素
first_user = redis_conn.lpop("recent_users")
# 4. 右侧弹出元素
last_task = redis_conn.rpop("task_queue")
# 5. 获取列表长度
length = redis_conn.llen("recent_users")
# 6. 获取指定范围的元素
users = redis_conn.lrange("recent_users", 0, 9) # 前10个元素
# 7. 移除元素
redis_conn.lrem("recent_users", 1, "user_123") # 删除1个user_123
# 8. 通过索引获取元素
user = redis_conn.lindex("recent_users", 0) # 获取第一个
# 9. 设置指定索引的值
redis_conn.lset("recent_users", 0, "new_user")
return {
"first_user": first_user,
"last_task": last_task,
"length": length,
"users": users
}
2. 实战:消息队列
class MessageQueue:
"""基于Redis列表的消息队列"""
def __init__(self, queue_name="default_queue"):
self.redis_conn = get_redis_connection("default")
self.queue_name = queue_name
def enqueue(self, message):
"""入队"""
import json
message_str = json.dumps(message)
return self.redis_conn.rpush(self.queue_name, message_str)
def dequeue(self, timeout=0):
"""出队(阻塞)"""
import json
result = self.redis_conn.blpop(self.queue_name, timeout=timeout)
if result:
queue_name, message_str = result
return json.loads(message_str)
return None
def size(self):
"""队列大小"""
return self.redis_conn.llen(self.queue_name)
def peek(self):
"""查看但不移除"""
import json
message_str = self.redis_conn.lindex(self.queue_name, 0)
if message_str:
return json.loads(message_str)
return None
# 使用示例
def process_messages():
"""处理消息队列"""
queue = MessageQueue("email_queue")
# 生产者
queue.enqueue({
"to": "user@example.com",
"subject": "Welcome",
"body": "Hello!"
})
# 消费者
while True:
message = queue.dequeue(timeout=5) # 阻塞5秒
if message:
# 处理消息
send_email(**message)
else:
# 没有消息,休息一下
import time
time.sleep(1)
五、集合类型(Set)操作
1. 基本集合操作
def set_operations():
"""Redis集合操作"""
redis_conn = get_redis_connection("default")
# 1. 添加元素
redis_conn.sadd("user_tags:123", "python", "django", "redis")
redis_conn.sadd("user_tags:456", "python", "javascript", "vue")
# 2. 获取所有元素
tags_123 = redis_conn.smembers("user_tags:123")
# 3. 判断元素是否存在
is_member = redis_conn.sismember("user_tags:123", "python")
# 4. 移除元素
redis_conn.srem("user_tags:123", "redis")
# 5. 获取集合大小
size = redis_conn.scard("user_tags:123")
# 6. 随机获取元素
random_tag = redis_conn.srandmember("user_tags:123", 1)
# 7. 集合运算
# 交集:都喜欢的标签
common_tags = redis_conn.sinter("user_tags:123", "user_tags:456")
# 并集:所有标签
all_tags = redis_conn.sunion("user_tags:123", "user_tags:456")
# 差集:用户123有但456没有的标签
diff_tags = redis_conn.sdiff("user_tags:123", "user_tags:456")
return {
"tags_123": tags_123,
"is_member": is_member,
"size": size,
"common_tags": common_tags
}
2. 实战:用户标签系统
class UserTagSystem:
"""基于Redis集合的用户标签系统"""
def __init__(self):
self.redis_conn = get_redis_connection("default")
def add_tag(self, user_id, tag):
"""为用户添加标签"""
return self.redis_conn.sadd(f"user_tags:{user_id}", tag)
def remove_tag(self, user_id, tag):
"""移除用户标签"""
return self.redis_conn.srem(f"user_tags:{user_id}", tag)
def get_user_tags(self, user_id):
"""获取用户所有标签"""
return self.redis_conn.smembers(f"user_tags:{user_id}")
def has_tag(self, user_id, tag):
"""检查用户是否有某个标签"""
return self.redis_conn.sismember(f"user_tags:{user_id}", tag)
def find_users_with_tag(self, tag):
"""查找具有某个标签的所有用户"""
# 需要遍历所有用户,实际应用中可以使用反向索引
pattern = "user_tags:*"
matching_users = []
# 使用SCAN避免阻塞
cursor = 0
while True:
cursor, keys = self.redis_conn.scan(cursor, pattern, count=100)
for key in keys:
if self.redis_conn.sismember(key, tag):
user_id = key.decode().split(":")[1]
matching_users.append(user_id)
if cursor == 0:
break
return matching_users
def get_common_tags(self, user_ids):
"""获取多个用户的共同标签"""
if len(user_ids) < 2:
return []
keys = [f"user_tags:{user_id}" for user_id in user_ids]
return self.redis_conn.sinter(*keys)
def get_recommended_tags(self, user_id):
"""根据相似用户推荐标签"""
user_tags = self.get_user_tags(user_id)
if not user_tags:
return []
# 查找有相同标签的用户
similar_users = []
for tag in user_tags:
users_with_tag = self.find_users_with_tag(tag)
similar_users.extend(users_with_tag)
# 去重并排除自己
similar_users = set(similar_users) - {user_id}
# 收集相似用户的标签
all_tags = set()
for similar_user in list(similar_users)[:10]: # 只看前10个相似用户
tags = self.get_user_tags(similar_user)
all_tags.update(tags)
# 排除用户已有的标签
recommended = all_tags - set(user_tags)
return list(recommended)[:10] # 返回前10个推荐标签
六、有序集合类型(Sorted Set)操作
1. 基本有序集合操作
def sorted_set_operations():
"""Redis有序集合操作"""
redis_conn = get_redis_connection("default")
# 1. 添加元素(带分数)
redis_conn.zadd("leaderboard", {
"player1": 1000,
"player2": 1500,
"player3": 800,
"player4": 2000
})
# 2. 增加分数
redis_conn.zincrby("leaderboard", 100, "player1") # player1增加100分
# 3. 按分数升序获取
ascending = redis_conn.zrange("leaderboard", 0, -1, withscores=True)
# 4. 按分数降序获取
descending = redis_conn.zrevrange("leaderboard", 0, -1, withscores=True)
# 5. 获取排名
rank = redis_conn.zrevrank("leaderboard", "player1") # 降序排名,0为第一
# 6. 获取分数
score = redis_conn.zscore("leaderboard", "player1")
# 7. 获取分数范围内的元素
top_players = redis_conn.zrangebyscore("leaderboard", 1000, 3000, withscores=True)
# 8. 移除元素
redis_conn.zrem("leaderboard", "player4")
# 9. 获取集合大小
size = redis_conn.zcard("leaderboard")
# 10. 统计分数区间的元素数量
count = redis_conn.zcount("leaderboard", 1000, 3000)
return {
"rank": rank,
"score": score,
"top_3": descending[:3],
"count": count
}
2. 实战:排行榜系统
class Leaderboard:
"""基于Redis有序集合的排行榜系统"""
def __init__(self, leaderboard_name="default_leaderboard"):
self.redis_conn = get_redis_connection("default")
self.name = leaderboard_name
def add_score(self, player_id, score):
"""添加或更新分数"""
return self.redis_conn.zincrby(self.name, score, player_id)
def get_top_n(self, n=10):
"""获取前N名"""
return self.redis_conn.zrevrange(
self.name, 0, n-1, withscores=True
)
def get_player_rank(self, player_id):
"""获取玩家排名(从1开始)"""
rank = self.redis_conn.zrevrank(self.name, player_id)
if rank is not None:
return rank + 1
return None
def get_player_score(self, player_id):
"""获取玩家分数"""
return self.redis_conn.zscore(self.name, player_id)
def get_players_around(self, player_id, range_size=5):
"""获取玩家附近的排名"""
rank = self.get_player_rank(player_id)
if rank is None:
return []
start = max(0, rank - 1 - range_size)
end = rank - 1 + range_size
players = self.redis_conn.zrevrange(
self.name, start, end, withscores=True
)
return [
{"player": player, "score": score, "rank": idx + start + 1}
for idx, (player, score) in enumerate(players)
]
def get_season_leaderboard(self, season_id, top_n=100):
"""获取赛季排行榜"""
season_key = f"{self.name}:season:{season_id}"
# 检查是否有缓存的赛季排行榜
cached = self.redis_conn.get(season_key)
if cached:
import json
return json.loads(cached)
# 计算赛季排行榜
season_end = int(season_id) # 假设season_id是时间戳
season_start = season_end - 86400 * 30 # 30天
# 获取赛季期间有分数的玩家
all_players = self.redis_conn.zrevrangebyscore(
f"{self.name}:daily:{season_end}",
season_start, season_end,
withscores=False
)
# 计算每个玩家的赛季总分
pipe = self.redis_conn.pipeline()
for player in all_players:
pipe.zscore(f"{self.name}:daily", player)
scores = pipe.execute()
# 创建有序集合
season_data = {player: score for player, score in zip(all_players, scores) if score}
if season_data:
season_leaderboard_key = f"{self.name}:season_temp:{season_id}"
self.redis_conn.zadd(season_leaderboard_key, season_data)
# 获取前N名
top_players = self.redis_conn.zrevrange(
season_leaderboard_key, 0, top_n-1, withscores=True
)
# 删除临时键
self.redis_conn.delete(season_leaderboard_key)
# 缓存结果
import json
self.redis_conn.setex(
season_key,
3600, # 缓存1小时
json.dumps(top_players)
)
return top_players
return []
# 游戏积分系统示例
class GameScoreSystem:
"""游戏积分系统"""
def __init__(self, game_id):
self.redis_conn = get_redis_connection("default")
self.game_id = game_id
self.leaderboard = Leaderboard(f"game:{game_id}:leaderboard")
def record_score(self, user_id, score, level, time_spent):
"""记录游戏得分"""
import time
timestamp = int(time.time())
# 1. 更新总排行榜
self.leaderboard.add_score(user_id, score)
# 2. 记录详细得分(使用哈希)
score_key = f"game:{self.game_id}:score:{user_id}:{timestamp}"
self.redis_conn.hmset(score_key, {
"score": score,
"level": level,
"time_spent": time_spent,
"timestamp": timestamp
})
# 设置24小时过期
self.redis_conn.expire(score_key, 86400)
# 3. 更新每日排行榜
date_str = time.strftime("%Y%m%d")
daily_key = f"game:{self.game_id}:daily:{date_str}"
self.redis_conn.zincrby(daily_key, score, user_id)
# 4. 更新关卡排行榜
level_key = f"game:{self.game_id}:level:{level}"
self.redis_conn.zincrby(level_key, 1, user_id) # 通关次数
return {
"rank": self.leaderboard.get_player_rank(user_id),
"score": self.leaderboard.get_player_score(user_id)
}
七、哈希类型(Hash)操作
1. 基本哈希操作
def hash_operations():
"""Redis哈希操作"""
redis_conn = get_redis_connection("default")
# 1. 设置单个字段
redis_conn.hset("user:123", "name", "张三")
# 2. 设置多个字段
redis_conn.hmset("user:123", {
"age": 25,
"email": "zhangsan@example.com",
"city": "北京"
})
# 3. 获取单个字段
name = redis_conn.hget("user:123", "name")
# 4. 获取多个字段
fields = redis_conn.hmget("user:123", ["name", "age", "email"])
# 5. 获取所有字段
all_fields = redis_conn.hgetall("user:123")
# 6. 获取所有字段名
field_names = redis_conn.hkeys("user:123")
# 7. 获取所有字段值
field_values = redis_conn.hvals("user:123")
# 8. 检查字段是否存在
exists = redis_conn.hexists("user:123", "name")
# 9. 删除字段
redis_conn.hdel("user:123", "city")
# 10. 获取字段数量
field_count = redis_conn.hlen("user:123")
# 11. 增加数值字段的值
redis_conn.hincrby("user:123", "age", 1) # 增加1岁
return {
"name": name,
"fields": fields,
"exists": exists,
"field_count": field_count
}
2. 实战:购物车系统
class ShoppingCart:
"""基于Redis哈希的购物车系统"""
def __init__(self, cart_id=None):
self.redis_conn = get_redis_connection("default")
self.cart_id = cart_id or f"cart:{int(time.time())}"
def add_item(self, product_id, quantity=1, price=None):
"""添加商品到购物车"""
cart_key = f"cart:items:{self.cart_id}"
# 获取当前数量
current_quantity = self.redis_conn.hget(cart_key, product_id)
if current_quantity:
quantity = int(current_quantity) + quantity
# 更新购物车
self.redis_conn.hset(cart_key, product_id, quantity)
# 如果提供了价格,存储到商品信息
if price is not None:
product_key = f"cart:product_info:{self.cart_id}"
self.redis_conn.hset(product_key, product_id, price)
return quantity
def remove_item(self, product_id, quantity=None):
"""从购物车移除商品"""
cart_key = f"cart:items:{self.cart_id}"
if quantity is None:
# 移除整个商品
self.redis_conn.hdel(cart_key, product_id)
# 同时移除价格信息
product_key = f"cart:product_info:{self.cart_id}"
self.redis_conn.hdel(product_key, product_id)
else:
# 减少数量
current_quantity = self.redis_conn.hget(cart_key, product_id)
if current_quantity:
new_quantity = int(current_quantity) - quantity
if new_quantity > 0:
self.redis_conn.hset(cart_key, product_id, new_quantity)
else:
self.remove_item(product_id)
def get_items(self):
"""获取购物车所有商品"""
cart_key = f"cart:items:{self.cart_id}"
items = self.redis_conn.hgetall(cart_key)
product_key = f"cart:product_info:{self.cart_id}"
prices = self.redis_conn.hgetall(product_key)
result = []
for product_id, quantity in items.items():
product_id = product_id.decode() if isinstance(product_id, bytes) else product_id
quantity = int(quantity)
price = prices.get(product_id.encode() if isinstance(product_id, str) else product_id)
if price:
price = float(price.decode() if isinstance(price, bytes) else price)
result.append({
"product_id": product_id,
"quantity": quantity,
"price": price,
"subtotal": price * quantity if price else None
})
return result
def get_total(self):
"""计算购物车总价"""
items = self.get_items()
return sum(item.get("subtotal", 0) for item in items)
def clear(self):
"""清空购物车"""
cart_key = f"cart:items:{self.cart_id}"
product_key = f"cart:product_info:{self.cart_id}"
self.redis_conn.delete(cart_key, product_key)
def get_item_count(self):
"""获取购物车商品总数"""
cart_key = f"cart:items:{self.cart_id}"
items = self.redis_conn.hvals(cart_key)
return sum(int(q) for q in items) if items else 0
def checkout(self):
"""结账"""
items = self.get_items()
if not items:
return {"success": False, "message": "购物车为空"}
# 生成订单
import uuid
order_id = str(uuid.uuid4())
# 保存订单
order_key = f"order:{order_id}"
self.redis_conn.hmset(order_key, {
"cart_id": self.cart_id,
"total": self.get_total(),
"created_at": int(time.time()),
"status": "pending"
})
# 保存订单商品
for item in items:
self.redis_conn.hset(
f"{order_key}:items",
item["product_id"],
item["quantity"]
)
# 清空购物车
self.clear()
return {
"success": True,
"order_id": order_id,
"total": self.get_total(),
"item_count": len(items)
}
八、高级数据结构组合应用
1. 社交关系系统
class SocialNetwork:
"""基于Redis的社交关系系统"""
def __init__(self):
self.redis_conn = get_redis_connection("default")
def follow(self, follower_id, followee_id):
"""关注用户"""
# 添加到关注列表(集合)
self.redis_conn.sadd(f"user:{follower_id}:following", followee_id)
self.redis_conn.sadd(f"user:{followee_id}:followers", follower_id)
# 记录时间线(有序集合)
import time
timestamp = time.time()
# 获取关注用户的动态
posts_key = f"user:{followee_id}:posts"
posts = self.redis_conn.zrevrangebyscore(
posts_key, timestamp - 86400 * 7, timestamp, # 最近7天的帖子
withscores=True
)
# 添加到关注者的时间线
timeline_key = f"user:{follower_id}:timeline"
for post_id, post_time in posts:
self.redis_conn.zadd(timeline_key, {post_id: post_time})
# 限制时间线长度
self.redis_conn.zremrangebyrank(timeline_key, 0, -1000) # 只保留最新的1000条
def unfollow(self, follower_id, followee_id):
"""取消关注"""
self.redis_conn.srem(f"user:{follower_id}:following", followee_id)
self.redis_conn.srem(f"user:{followee_id}:followers", follower_id)
def get_followers(self, user_id, page=1, per_page=20):
"""获取粉丝列表"""
key = f"user:{user_id}:followers"
start = (page - 1) * per_page
end = start + per_page - 1
# 使用集合获取粉丝
followers = self.redis_conn.smembers(key)
# 获取粉丝详细信息
followers_list = []
for follower_id in list(followers)[start:end+1]:
user_info = self.redis_conn.hgetall(f"user:{follower_id}")
followers_list.append(user_info)
return {
"followers": followers_list,
"total": self.redis_conn.scard(key),
"page": page,
"per_page": per_page
}
def get_mutual_followers(self, user1_id, user2_id):
"""获取共同关注的人"""
key1 = f"user:{user1_id}:following"
key2 = f"user:{user2_id}:following"
return self.redis_conn.sinter(key1, key2)
def post_update(self, user_id, content):
"""发布动态"""
import time
import uuid
post_id = str(uuid.uuid4())
timestamp = time.time()
# 存储帖子内容(哈希)
post_key = f"post:{post_id}"
self.redis_conn.hmset(post_key, {
"id": post_id,
"user_id": user_id,
"content": content,
"timestamp": timestamp,
"likes": 0,
"comments": 0
})
# 添加到用户的帖子列表(有序集合)
user_posts_key = f"user:{user_id}:posts"
self.redis_conn.zadd(user_posts_key, {post_id: timestamp})
# 添加到粉丝的时间线
followers_key = f"user:{user_id}:followers"
followers = self.redis_conn.smembers(followers_key)
for follower_id in followers:
timeline_key = f"user:{follower_id}:timeline"
self.redis_conn.zadd(timeline_key, {post_id: timestamp})
self.redis_conn.zremrangebyrank(timeline_key, 0, -1000)
return post_id
def get_timeline(self, user_id, page=1, per_page=20):
"""获取时间线"""
timeline_key = f"user:{user_id}:timeline"
start = (page - 1) * per_page
end = start + per_page - 1
# 获取帖子ID
post_ids = self.redis_conn.zrevrange(timeline_key, start, end)
# 获取帖子详情
posts = []
for post_id in post_ids:
post = self.redis_conn.hgetall(f"post:{post_id}")
if post:
posts.append(post)
return {
"posts": posts,
"total": self.redis_conn.zcard(timeline_key),
"page": page,
"per_page": per_page
}
2. 实时统计系统
class RealTimeStatistics:
"""实时统计系统"""
def __init__(self):
self.redis_conn = get_redis_connection("default")
def record_event(self, event_type, user_id=None, data=None):
"""记录事件"""
import time
timestamp = int(time.time())
# 1. 记录到全局事件流(列表)
event_id = f"{timestamp}:{user_id}:{event_type}"
self.redis_conn.lpush("events:stream", event_id)
# 2. 按类型计数(哈希)
today = time.strftime("%Y%m%d")
daily_key = f"stats:events:{today}"
self.redis_conn.hincrby(daily_key, event_type, 1)
# 3. 记录用户事件(有序集合)
if user_id:
user_events_key = f"user:{user_id}:events"
self.redis_conn.zadd(user_events_key, {event_type: timestamp})
# 用户最后活跃时间
self.redis_conn.hset(f"user:{user_id}:activity", "last_active", timestamp)
# 4. 按小时统计(哈希)
hour = time.strftime("%Y%m%d%H")
hour_key = f"stats:hourly:{hour}"
self.redis_conn.hincrby(hour_key, event_type, 1)
return event_id
def get_daily_stats(self, date_str=None):
"""获取每日统计"""
import time
if not date_str:
date_str = time.strftime("%Y%m%d")
key = f"stats:events:{date_str}"
return self.redis_conn.hgetall(key)
def get_hourly_trend(self, event_type, hours=24):
"""获取小时趋势"""
import time
from datetime import datetime, timedelta
now = datetime.now()
trend = []
for i in range(hours):
hour = now - timedelta(hours=i)
hour_str = hour.strftime("%Y%m%d%H")
key = f"stats:hourly:{hour_str}"
count = self.redis_conn.hget(key, event_type)
trend.append({
"hour": hour_str,
"count": int(count) if count else 0
})
return list(reversed(trend)) # 从早到晚
def get_active_users(self, minutes=5):
"""获取活跃用户(最近N分钟有活动的用户)"""
import time
# 使用集合存储活跃用户
active_users_key = "users:active"
# 获取所有用户
pattern = "user:*:activity"
active_users = set()
cursor = 0
while True:
cursor, keys = self.redis_conn.scan(cursor, pattern, count=100)
for key in keys:
# 解析用户ID
user_id = key.decode().split(":")[1]
# 检查最后活跃时间
last_active = self.redis_conn.hget(key, "last_active")
if last_active:
last_active = int(last_active)
if time.time() - last_active < minutes * 60:
active_users.add(user_id)
if cursor == 0:
break
return list(active_users)
九、性能优化技巧
1. 使用管道(Pipeline)
def use_pipeline():
"""使用管道批量操作"""
redis_conn = get_redis_connection("default")
# 创建管道
pipe = redis_conn.pipeline()
# 批量添加命令
for i in range(100):
pipe.set(f"key:{i}", f"value:{i}")
pipe.expire(f"key:{i}", 3600)
# 一次执行所有命令
results = pipe.execute()
return len(results) # 200个结果
2. 使用Lua脚本
def use_lua_script():
"""使用Lua脚本实现原子操作"""
redis_conn = get_redis_connection("default")
# 原子增加分数并更新排行榜
lua_script = """
local player = KEYS[1]
local score = tonumber(ARGV[1])
local leaderboard = KEYS[2]
-- 增加分数
local new_score = redis.call('ZINCRBY', leaderboard, score, player)
-- 获取排名
local rank = redis.call('ZREVRANK', leaderboard, player)
-- 返回结果
return {new_score, rank}
"""
# 执行脚本
result = redis_conn.eval(
lua_script,
2, # 2个KEYS
"player_123", # KEYS[1]
"leaderboard", # KEYS[2]
100 # ARGV[1]
)
return {
"new_score": float(result[0]),
"rank": int(result[1]) + 1 if result[1] is not None else None
}
十、总结
通过原生Redis连接,你可以在Django中使用Redis的所有数据类型:
| 数据类型 | 主要用途 | Django对应 |
|---|---|---|
| 字符串 | 简单缓存、计数器 | Django缓存API |
| 列表 | 消息队列、时间线 | 无,需手动实现 |
| 集合 | 标签、好友关系 | 无,需手动实现 |
| 有序集合 | 排行榜、优先级队列 | 无,需手动实现 |
| 哈希 | 对象存储、购物车 | 无,需手动实现 |
最佳实践:
- 简单缓存使用Django缓存API
- 复杂数据结构使用原生Redis连接
- 批量操作使用管道
- 原子操作使用Lua脚本
- 合理设置过期时间,避免内存泄漏
这样,你就可以充分利用Redis的所有功能来构建高性能的Django应用了。
以上就是Django中使用django-redis库与Redis交互API指南的详细内容,更多关于Django django-redis与Redis交互API的资料请关注脚本之家其它相关文章!
