python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Django django-redis与Redis交互API

Django中使用django-redis库与Redis交互API指南

作者:Python游侠

这篇文章主要介绍了Django缓存与原生Redis的区别,以及如何在Django中使用Redis的各种数据类型进行缓存,文章还提供了一些实战场景和性能优化技巧,帮助开发者充分利用Redis的强大功能来构建高性能的应用,需要的朋友可以参考下

一、理解Django缓存与原生Redis的区别

Django缓存APIRedis原生数据类型用途
键值对存储字符串(String)简单缓存
不支持列表(List)消息队列、最新列表
不支持集合(Set)去重、共同好友
不支持有序集合(Sorted Set)排行榜、优先级队列
不支持哈希(Hash)对象存储、多个字段

二、获取原生Redis连接

要在Django中使用Redis的所有数据类型,需要获取原生Redis连接:

# 方法1:通过django-redis获取连接
from django_redis import get_redis_connection

# 获取默认缓存对应的Redis连接
redis_conn = get_redis_connection("default")

# 获取特定缓存的连接
session_conn = get_redis_connection("session")

# 方法2:直接创建连接(不推荐,缺少连接池管理)
import redis
redis_client = redis.Redis(
    host='localhost',
    port=6379,
    db=0,
    password=None
)

三、字符串类型(String)操作

虽然Django缓存API支持字符串,但Redis有更多功能:

1. 带过期时间的操作

def string_operations():
    """Redis字符串操作"""
    redis_conn = get_redis_connection("default")
    
    # 1. 设置带过期时间的键
    redis_conn.setex("session:user:123", 3600, "session_data")  # 1小时后过期
    
    # 2. 设置多个键
    redis_conn.mset({"key1": "value1", "key2": "value2"})
    
    # 3. 获取字符串的一部分
    redis_conn.set("message", "Hello World")
    part = redis_conn.getrange("message", 0, 4)  # "Hello"
    
    # 4. 追加字符串
    redis_conn.append("message", " Redis!")
    
    # 5. 获取字符串长度
    length = redis_conn.strlen("message")
    
    # 6. 设置键的值并返回旧值
    old_value = redis_conn.getset("counter", "100")
    
    # 7. 位操作
    redis_conn.setbit("user:online:2023-10-01", 123, 1)  # 用户123在线
    redis_conn.getbit("user:online:2023-10-01", 123)  # 检查是否在线
    
    return {
        "part": part,
        "length": length,
        "old_value": old_value
    }

四、列表类型(List)操作

1. 基本列表操作

def list_operations():
    """Redis列表操作"""
    redis_conn = get_redis_connection("default")
    
    # 1. 左侧推入元素
    redis_conn.lpush("recent_users", "user_123", "user_456", "user_789")
    
    # 2. 右侧推入元素
    redis_conn.rpush("task_queue", "task1", "task2", "task3")
    
    # 3. 左侧弹出元素
    first_user = redis_conn.lpop("recent_users")
    
    # 4. 右侧弹出元素
    last_task = redis_conn.rpop("task_queue")
    
    # 5. 获取列表长度
    length = redis_conn.llen("recent_users")
    
    # 6. 获取指定范围的元素
    users = redis_conn.lrange("recent_users", 0, 9)  # 前10个元素
    
    # 7. 移除元素
    redis_conn.lrem("recent_users", 1, "user_123")  # 删除1个user_123
    
    # 8. 通过索引获取元素
    user = redis_conn.lindex("recent_users", 0)  # 获取第一个
    
    # 9. 设置指定索引的值
    redis_conn.lset("recent_users", 0, "new_user")
    
    return {
        "first_user": first_user,
        "last_task": last_task,
        "length": length,
        "users": users
    }

2. 实战:消息队列

class MessageQueue:
    """基于Redis列表的消息队列"""
    
    def __init__(self, queue_name="default_queue"):
        self.redis_conn = get_redis_connection("default")
        self.queue_name = queue_name
    
    def enqueue(self, message):
        """入队"""
        import json
        message_str = json.dumps(message)
        return self.redis_conn.rpush(self.queue_name, message_str)
    
    def dequeue(self, timeout=0):
        """出队(阻塞)"""
        import json
        result = self.redis_conn.blpop(self.queue_name, timeout=timeout)
        if result:
            queue_name, message_str = result
            return json.loads(message_str)
        return None
    
    def size(self):
        """队列大小"""
        return self.redis_conn.llen(self.queue_name)
    
    def peek(self):
        """查看但不移除"""
        import json
        message_str = self.redis_conn.lindex(self.queue_name, 0)
        if message_str:
            return json.loads(message_str)
        return None

# 使用示例
def process_messages():
    """处理消息队列"""
    queue = MessageQueue("email_queue")
    
    # 生产者
    queue.enqueue({
        "to": "user@example.com",
        "subject": "Welcome",
        "body": "Hello!"
    })
    
    # 消费者
    while True:
        message = queue.dequeue(timeout=5)  # 阻塞5秒
        if message:
            # 处理消息
            send_email(**message)
        else:
            # 没有消息,休息一下
            import time
            time.sleep(1)

五、集合类型(Set)操作

1. 基本集合操作

def set_operations():
    """Redis集合操作"""
    redis_conn = get_redis_connection("default")
    
    # 1. 添加元素
    redis_conn.sadd("user_tags:123", "python", "django", "redis")
    redis_conn.sadd("user_tags:456", "python", "javascript", "vue")
    
    # 2. 获取所有元素
    tags_123 = redis_conn.smembers("user_tags:123")
    
    # 3. 判断元素是否存在
    is_member = redis_conn.sismember("user_tags:123", "python")
    
    # 4. 移除元素
    redis_conn.srem("user_tags:123", "redis")
    
    # 5. 获取集合大小
    size = redis_conn.scard("user_tags:123")
    
    # 6. 随机获取元素
    random_tag = redis_conn.srandmember("user_tags:123", 1)
    
    # 7. 集合运算
    # 交集:都喜欢的标签
    common_tags = redis_conn.sinter("user_tags:123", "user_tags:456")
    
    # 并集:所有标签
    all_tags = redis_conn.sunion("user_tags:123", "user_tags:456")
    
    # 差集:用户123有但456没有的标签
    diff_tags = redis_conn.sdiff("user_tags:123", "user_tags:456")
    
    return {
        "tags_123": tags_123,
        "is_member": is_member,
        "size": size,
        "common_tags": common_tags
    }

2. 实战:用户标签系统

class UserTagSystem:
    """基于Redis集合的用户标签系统"""
    
    def __init__(self):
        self.redis_conn = get_redis_connection("default")
    
    def add_tag(self, user_id, tag):
        """为用户添加标签"""
        return self.redis_conn.sadd(f"user_tags:{user_id}", tag)
    
    def remove_tag(self, user_id, tag):
        """移除用户标签"""
        return self.redis_conn.srem(f"user_tags:{user_id}", tag)
    
    def get_user_tags(self, user_id):
        """获取用户所有标签"""
        return self.redis_conn.smembers(f"user_tags:{user_id}")
    
    def has_tag(self, user_id, tag):
        """检查用户是否有某个标签"""
        return self.redis_conn.sismember(f"user_tags:{user_id}", tag)
    
    def find_users_with_tag(self, tag):
        """查找具有某个标签的所有用户"""
        # 需要遍历所有用户,实际应用中可以使用反向索引
        pattern = "user_tags:*"
        matching_users = []
        
        # 使用SCAN避免阻塞
        cursor = 0
        while True:
            cursor, keys = self.redis_conn.scan(cursor, pattern, count=100)
            for key in keys:
                if self.redis_conn.sismember(key, tag):
                    user_id = key.decode().split(":")[1]
                    matching_users.append(user_id)
            
            if cursor == 0:
                break
        
        return matching_users
    
    def get_common_tags(self, user_ids):
        """获取多个用户的共同标签"""
        if len(user_ids) < 2:
            return []
        
        keys = [f"user_tags:{user_id}" for user_id in user_ids]
        return self.redis_conn.sinter(*keys)
    
    def get_recommended_tags(self, user_id):
        """根据相似用户推荐标签"""
        user_tags = self.get_user_tags(user_id)
        if not user_tags:
            return []
        
        # 查找有相同标签的用户
        similar_users = []
        for tag in user_tags:
            users_with_tag = self.find_users_with_tag(tag)
            similar_users.extend(users_with_tag)
        
        # 去重并排除自己
        similar_users = set(similar_users) - {user_id}
        
        # 收集相似用户的标签
        all_tags = set()
        for similar_user in list(similar_users)[:10]:  # 只看前10个相似用户
            tags = self.get_user_tags(similar_user)
            all_tags.update(tags)
        
        # 排除用户已有的标签
        recommended = all_tags - set(user_tags)
        
        return list(recommended)[:10]  # 返回前10个推荐标签

六、有序集合类型(Sorted Set)操作

1. 基本有序集合操作

def sorted_set_operations():
    """Redis有序集合操作"""
    redis_conn = get_redis_connection("default")
    
    # 1. 添加元素(带分数)
    redis_conn.zadd("leaderboard", {
        "player1": 1000,
        "player2": 1500,
        "player3": 800,
        "player4": 2000
    })
    
    # 2. 增加分数
    redis_conn.zincrby("leaderboard", 100, "player1")  # player1增加100分
    
    # 3. 按分数升序获取
    ascending = redis_conn.zrange("leaderboard", 0, -1, withscores=True)
    
    # 4. 按分数降序获取
    descending = redis_conn.zrevrange("leaderboard", 0, -1, withscores=True)
    
    # 5. 获取排名
    rank = redis_conn.zrevrank("leaderboard", "player1")  # 降序排名,0为第一
    
    # 6. 获取分数
    score = redis_conn.zscore("leaderboard", "player1")
    
    # 7. 获取分数范围内的元素
    top_players = redis_conn.zrangebyscore("leaderboard", 1000, 3000, withscores=True)
    
    # 8. 移除元素
    redis_conn.zrem("leaderboard", "player4")
    
    # 9. 获取集合大小
    size = redis_conn.zcard("leaderboard")
    
    # 10. 统计分数区间的元素数量
    count = redis_conn.zcount("leaderboard", 1000, 3000)
    
    return {
        "rank": rank,
        "score": score,
        "top_3": descending[:3],
        "count": count
    }

2. 实战:排行榜系统

class Leaderboard:
    """基于Redis有序集合的排行榜系统"""
    
    def __init__(self, leaderboard_name="default_leaderboard"):
        self.redis_conn = get_redis_connection("default")
        self.name = leaderboard_name
    
    def add_score(self, player_id, score):
        """添加或更新分数"""
        return self.redis_conn.zincrby(self.name, score, player_id)
    
    def get_top_n(self, n=10):
        """获取前N名"""
        return self.redis_conn.zrevrange(
            self.name, 0, n-1, withscores=True
        )
    
    def get_player_rank(self, player_id):
        """获取玩家排名(从1开始)"""
        rank = self.redis_conn.zrevrank(self.name, player_id)
        if rank is not None:
            return rank + 1
        return None
    
    def get_player_score(self, player_id):
        """获取玩家分数"""
        return self.redis_conn.zscore(self.name, player_id)
    
    def get_players_around(self, player_id, range_size=5):
        """获取玩家附近的排名"""
        rank = self.get_player_rank(player_id)
        if rank is None:
            return []
        
        start = max(0, rank - 1 - range_size)
        end = rank - 1 + range_size
        
        players = self.redis_conn.zrevrange(
            self.name, start, end, withscores=True
        )
        
        return [
            {"player": player, "score": score, "rank": idx + start + 1}
            for idx, (player, score) in enumerate(players)
        ]
    
    def get_season_leaderboard(self, season_id, top_n=100):
        """获取赛季排行榜"""
        season_key = f"{self.name}:season:{season_id}"
        
        # 检查是否有缓存的赛季排行榜
        cached = self.redis_conn.get(season_key)
        if cached:
            import json
            return json.loads(cached)
        
        # 计算赛季排行榜
        season_end = int(season_id)  # 假设season_id是时间戳
        season_start = season_end - 86400 * 30  # 30天
        
        # 获取赛季期间有分数的玩家
        all_players = self.redis_conn.zrevrangebyscore(
            f"{self.name}:daily:{season_end}",
            season_start, season_end,
            withscores=False
        )
        
        # 计算每个玩家的赛季总分
        pipe = self.redis_conn.pipeline()
        for player in all_players:
            pipe.zscore(f"{self.name}:daily", player)
        
        scores = pipe.execute()
        
        # 创建有序集合
        season_data = {player: score for player, score in zip(all_players, scores) if score}
        
        if season_data:
            season_leaderboard_key = f"{self.name}:season_temp:{season_id}"
            self.redis_conn.zadd(season_leaderboard_key, season_data)
            
            # 获取前N名
            top_players = self.redis_conn.zrevrange(
                season_leaderboard_key, 0, top_n-1, withscores=True
            )
            
            # 删除临时键
            self.redis_conn.delete(season_leaderboard_key)
            
            # 缓存结果
            import json
            self.redis_conn.setex(
                season_key,
                3600,  # 缓存1小时
                json.dumps(top_players)
            )
            
            return top_players
        
        return []


# 游戏积分系统示例
class GameScoreSystem:
    """游戏积分系统"""
    
    def __init__(self, game_id):
        self.redis_conn = get_redis_connection("default")
        self.game_id = game_id
        self.leaderboard = Leaderboard(f"game:{game_id}:leaderboard")
    
    def record_score(self, user_id, score, level, time_spent):
        """记录游戏得分"""
        import time
        timestamp = int(time.time())
        
        # 1. 更新总排行榜
        self.leaderboard.add_score(user_id, score)
        
        # 2. 记录详细得分(使用哈希)
        score_key = f"game:{self.game_id}:score:{user_id}:{timestamp}"
        self.redis_conn.hmset(score_key, {
            "score": score,
            "level": level,
            "time_spent": time_spent,
            "timestamp": timestamp
        })
        
        # 设置24小时过期
        self.redis_conn.expire(score_key, 86400)
        
        # 3. 更新每日排行榜
        date_str = time.strftime("%Y%m%d")
        daily_key = f"game:{self.game_id}:daily:{date_str}"
        self.redis_conn.zincrby(daily_key, score, user_id)
        
        # 4. 更新关卡排行榜
        level_key = f"game:{self.game_id}:level:{level}"
        self.redis_conn.zincrby(level_key, 1, user_id)  # 通关次数
        
        return {
            "rank": self.leaderboard.get_player_rank(user_id),
            "score": self.leaderboard.get_player_score(user_id)
        }

七、哈希类型(Hash)操作

1. 基本哈希操作

def hash_operations():
    """Redis哈希操作"""
    redis_conn = get_redis_connection("default")
    
    # 1. 设置单个字段
    redis_conn.hset("user:123", "name", "张三")
    
    # 2. 设置多个字段
    redis_conn.hmset("user:123", {
        "age": 25,
        "email": "zhangsan@example.com",
        "city": "北京"
    })
    
    # 3. 获取单个字段
    name = redis_conn.hget("user:123", "name")
    
    # 4. 获取多个字段
    fields = redis_conn.hmget("user:123", ["name", "age", "email"])
    
    # 5. 获取所有字段
    all_fields = redis_conn.hgetall("user:123")
    
    # 6. 获取所有字段名
    field_names = redis_conn.hkeys("user:123")
    
    # 7. 获取所有字段值
    field_values = redis_conn.hvals("user:123")
    
    # 8. 检查字段是否存在
    exists = redis_conn.hexists("user:123", "name")
    
    # 9. 删除字段
    redis_conn.hdel("user:123", "city")
    
    # 10. 获取字段数量
    field_count = redis_conn.hlen("user:123")
    
    # 11. 增加数值字段的值
    redis_conn.hincrby("user:123", "age", 1)  # 增加1岁
    
    return {
        "name": name,
        "fields": fields,
        "exists": exists,
        "field_count": field_count
    }

2. 实战:购物车系统

class ShoppingCart:
    """基于Redis哈希的购物车系统"""
    
    def __init__(self, cart_id=None):
        self.redis_conn = get_redis_connection("default")
        self.cart_id = cart_id or f"cart:{int(time.time())}"
    
    def add_item(self, product_id, quantity=1, price=None):
        """添加商品到购物车"""
        cart_key = f"cart:items:{self.cart_id}"
        
        # 获取当前数量
        current_quantity = self.redis_conn.hget(cart_key, product_id)
        if current_quantity:
            quantity = int(current_quantity) + quantity
        
        # 更新购物车
        self.redis_conn.hset(cart_key, product_id, quantity)
        
        # 如果提供了价格,存储到商品信息
        if price is not None:
            product_key = f"cart:product_info:{self.cart_id}"
            self.redis_conn.hset(product_key, product_id, price)
        
        return quantity
    
    def remove_item(self, product_id, quantity=None):
        """从购物车移除商品"""
        cart_key = f"cart:items:{self.cart_id}"
        
        if quantity is None:
            # 移除整个商品
            self.redis_conn.hdel(cart_key, product_id)
            
            # 同时移除价格信息
            product_key = f"cart:product_info:{self.cart_id}"
            self.redis_conn.hdel(product_key, product_id)
        else:
            # 减少数量
            current_quantity = self.redis_conn.hget(cart_key, product_id)
            if current_quantity:
                new_quantity = int(current_quantity) - quantity
                if new_quantity > 0:
                    self.redis_conn.hset(cart_key, product_id, new_quantity)
                else:
                    self.remove_item(product_id)
    
    def get_items(self):
        """获取购物车所有商品"""
        cart_key = f"cart:items:{self.cart_id}"
        items = self.redis_conn.hgetall(cart_key)
        
        product_key = f"cart:product_info:{self.cart_id}"
        prices = self.redis_conn.hgetall(product_key)
        
        result = []
        for product_id, quantity in items.items():
            product_id = product_id.decode() if isinstance(product_id, bytes) else product_id
            quantity = int(quantity)
            
            price = prices.get(product_id.encode() if isinstance(product_id, str) else product_id)
            if price:
                price = float(price.decode() if isinstance(price, bytes) else price)
            
            result.append({
                "product_id": product_id,
                "quantity": quantity,
                "price": price,
                "subtotal": price * quantity if price else None
            })
        
        return result
    
    def get_total(self):
        """计算购物车总价"""
        items = self.get_items()
        return sum(item.get("subtotal", 0) for item in items)
    
    def clear(self):
        """清空购物车"""
        cart_key = f"cart:items:{self.cart_id}"
        product_key = f"cart:product_info:{self.cart_id}"
        
        self.redis_conn.delete(cart_key, product_key)
    
    def get_item_count(self):
        """获取购物车商品总数"""
        cart_key = f"cart:items:{self.cart_id}"
        items = self.redis_conn.hvals(cart_key)
        return sum(int(q) for q in items) if items else 0
    
    def checkout(self):
        """结账"""
        items = self.get_items()
        
        if not items:
            return {"success": False, "message": "购物车为空"}
        
        # 生成订单
        import uuid
        order_id = str(uuid.uuid4())
        
        # 保存订单
        order_key = f"order:{order_id}"
        self.redis_conn.hmset(order_key, {
            "cart_id": self.cart_id,
            "total": self.get_total(),
            "created_at": int(time.time()),
            "status": "pending"
        })
        
        # 保存订单商品
        for item in items:
            self.redis_conn.hset(
                f"{order_key}:items",
                item["product_id"],
                item["quantity"]
            )
        
        # 清空购物车
        self.clear()
        
        return {
            "success": True,
            "order_id": order_id,
            "total": self.get_total(),
            "item_count": len(items)
        }

八、高级数据结构组合应用

1. 社交关系系统

class SocialNetwork:
    """基于Redis的社交关系系统"""
    
    def __init__(self):
        self.redis_conn = get_redis_connection("default")
    
    def follow(self, follower_id, followee_id):
        """关注用户"""
        # 添加到关注列表(集合)
        self.redis_conn.sadd(f"user:{follower_id}:following", followee_id)
        self.redis_conn.sadd(f"user:{followee_id}:followers", follower_id)
        
        # 记录时间线(有序集合)
        import time
        timestamp = time.time()
        
        # 获取关注用户的动态
        posts_key = f"user:{followee_id}:posts"
        posts = self.redis_conn.zrevrangebyscore(
            posts_key, timestamp - 86400 * 7, timestamp,  # 最近7天的帖子
            withscores=True
        )
        
        # 添加到关注者的时间线
        timeline_key = f"user:{follower_id}:timeline"
        for post_id, post_time in posts:
            self.redis_conn.zadd(timeline_key, {post_id: post_time})
        
        # 限制时间线长度
        self.redis_conn.zremrangebyrank(timeline_key, 0, -1000)  # 只保留最新的1000条
    
    def unfollow(self, follower_id, followee_id):
        """取消关注"""
        self.redis_conn.srem(f"user:{follower_id}:following", followee_id)
        self.redis_conn.srem(f"user:{followee_id}:followers", follower_id)
    
    def get_followers(self, user_id, page=1, per_page=20):
        """获取粉丝列表"""
        key = f"user:{user_id}:followers"
        start = (page - 1) * per_page
        end = start + per_page - 1
        
        # 使用集合获取粉丝
        followers = self.redis_conn.smembers(key)
        
        # 获取粉丝详细信息
        followers_list = []
        for follower_id in list(followers)[start:end+1]:
            user_info = self.redis_conn.hgetall(f"user:{follower_id}")
            followers_list.append(user_info)
        
        return {
            "followers": followers_list,
            "total": self.redis_conn.scard(key),
            "page": page,
            "per_page": per_page
        }
    
    def get_mutual_followers(self, user1_id, user2_id):
        """获取共同关注的人"""
        key1 = f"user:{user1_id}:following"
        key2 = f"user:{user2_id}:following"
        
        return self.redis_conn.sinter(key1, key2)
    
    def post_update(self, user_id, content):
        """发布动态"""
        import time
        import uuid
        
        post_id = str(uuid.uuid4())
        timestamp = time.time()
        
        # 存储帖子内容(哈希)
        post_key = f"post:{post_id}"
        self.redis_conn.hmset(post_key, {
            "id": post_id,
            "user_id": user_id,
            "content": content,
            "timestamp": timestamp,
            "likes": 0,
            "comments": 0
        })
        
        # 添加到用户的帖子列表(有序集合)
        user_posts_key = f"user:{user_id}:posts"
        self.redis_conn.zadd(user_posts_key, {post_id: timestamp})
        
        # 添加到粉丝的时间线
        followers_key = f"user:{user_id}:followers"
        followers = self.redis_conn.smembers(followers_key)
        
        for follower_id in followers:
            timeline_key = f"user:{follower_id}:timeline"
            self.redis_conn.zadd(timeline_key, {post_id: timestamp})
            self.redis_conn.zremrangebyrank(timeline_key, 0, -1000)
        
        return post_id
    
    def get_timeline(self, user_id, page=1, per_page=20):
        """获取时间线"""
        timeline_key = f"user:{user_id}:timeline"
        start = (page - 1) * per_page
        end = start + per_page - 1
        
        # 获取帖子ID
        post_ids = self.redis_conn.zrevrange(timeline_key, start, end)
        
        # 获取帖子详情
        posts = []
        for post_id in post_ids:
            post = self.redis_conn.hgetall(f"post:{post_id}")
            if post:
                posts.append(post)
        
        return {
            "posts": posts,
            "total": self.redis_conn.zcard(timeline_key),
            "page": page,
            "per_page": per_page
        }

2. 实时统计系统

class RealTimeStatistics:
    """实时统计系统"""
    
    def __init__(self):
        self.redis_conn = get_redis_connection("default")
    
    def record_event(self, event_type, user_id=None, data=None):
        """记录事件"""
        import time
        timestamp = int(time.time())
        
        # 1. 记录到全局事件流(列表)
        event_id = f"{timestamp}:{user_id}:{event_type}"
        self.redis_conn.lpush("events:stream", event_id)
        
        # 2. 按类型计数(哈希)
        today = time.strftime("%Y%m%d")
        daily_key = f"stats:events:{today}"
        self.redis_conn.hincrby(daily_key, event_type, 1)
        
        # 3. 记录用户事件(有序集合)
        if user_id:
            user_events_key = f"user:{user_id}:events"
            self.redis_conn.zadd(user_events_key, {event_type: timestamp})
            
            # 用户最后活跃时间
            self.redis_conn.hset(f"user:{user_id}:activity", "last_active", timestamp)
        
        # 4. 按小时统计(哈希)
        hour = time.strftime("%Y%m%d%H")
        hour_key = f"stats:hourly:{hour}"
        self.redis_conn.hincrby(hour_key, event_type, 1)
        
        return event_id
    
    def get_daily_stats(self, date_str=None):
        """获取每日统计"""
        import time
        if not date_str:
            date_str = time.strftime("%Y%m%d")
        
        key = f"stats:events:{date_str}"
        return self.redis_conn.hgetall(key)
    
    def get_hourly_trend(self, event_type, hours=24):
        """获取小时趋势"""
        import time
        from datetime import datetime, timedelta
        
        now = datetime.now()
        trend = []
        
        for i in range(hours):
            hour = now - timedelta(hours=i)
            hour_str = hour.strftime("%Y%m%d%H")
            key = f"stats:hourly:{hour_str}"
            
            count = self.redis_conn.hget(key, event_type)
            trend.append({
                "hour": hour_str,
                "count": int(count) if count else 0
            })
        
        return list(reversed(trend))  # 从早到晚
    
    def get_active_users(self, minutes=5):
        """获取活跃用户(最近N分钟有活动的用户)"""
        import time
        
        # 使用集合存储活跃用户
        active_users_key = "users:active"
        
        # 获取所有用户
        pattern = "user:*:activity"
        active_users = set()
        
        cursor = 0
        while True:
            cursor, keys = self.redis_conn.scan(cursor, pattern, count=100)
            
            for key in keys:
                # 解析用户ID
                user_id = key.decode().split(":")[1]
                
                # 检查最后活跃时间
                last_active = self.redis_conn.hget(key, "last_active")
                if last_active:
                    last_active = int(last_active)
                    if time.time() - last_active < minutes * 60:
                        active_users.add(user_id)
            
            if cursor == 0:
                break
        
        return list(active_users)

九、性能优化技巧

1. 使用管道(Pipeline)

def use_pipeline():
    """使用管道批量操作"""
    redis_conn = get_redis_connection("default")
    
    # 创建管道
    pipe = redis_conn.pipeline()
    
    # 批量添加命令
    for i in range(100):
        pipe.set(f"key:{i}", f"value:{i}")
        pipe.expire(f"key:{i}", 3600)
    
    # 一次执行所有命令
    results = pipe.execute()
    
    return len(results)  # 200个结果

2. 使用Lua脚本

def use_lua_script():
    """使用Lua脚本实现原子操作"""
    redis_conn = get_redis_connection("default")
    
    # 原子增加分数并更新排行榜
    lua_script = """
    local player = KEYS[1]
    local score = tonumber(ARGV[1])
    local leaderboard = KEYS[2]
    
    -- 增加分数
    local new_score = redis.call('ZINCRBY', leaderboard, score, player)
    
    -- 获取排名
    local rank = redis.call('ZREVRANK', leaderboard, player)
    
    -- 返回结果
    return {new_score, rank}
    """
    
    # 执行脚本
    result = redis_conn.eval(
        lua_script,
        2,  # 2个KEYS
        "player_123",  # KEYS[1]
        "leaderboard",  # KEYS[2]
        100  # ARGV[1]
    )
    
    return {
        "new_score": float(result[0]),
        "rank": int(result[1]) + 1 if result[1] is not None else None
    }

十、总结

通过原生Redis连接,你可以在Django中使用Redis的所有数据类型:

数据类型主要用途Django对应
字符串简单缓存、计数器Django缓存API
列表消息队列、时间线无,需手动实现
集合标签、好友关系无,需手动实现
有序集合排行榜、优先级队列无,需手动实现
哈希对象存储、购物车无,需手动实现

最佳实践:

  1. 简单缓存使用Django缓存API
  2. 复杂数据结构使用原生Redis连接
  3. 批量操作使用管道
  4. 原子操作使用Lua脚本
  5. 合理设置过期时间,避免内存泄漏

这样,你就可以充分利用Redis的所有功能来构建高性能的Django应用了。

以上就是Django中使用django-redis库与Redis交互API指南的详细内容,更多关于Django django-redis与Redis交互API的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文