python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > django db性能调优

浅谈django的db性能调优

作者:哈里谢顿

本文主要介绍了浅谈django的db性能调优,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

1 环境准备

构建一个user_demo表

from django.db import models

class UserInfo(models.Model):
    # 1. 基础字段
    username = models.CharField(max_length=50, unique=True)
    # 给 email 一个默认空字符串,或者设为 null=True,防止迁移报错
    email = models.EmailField(default='') 

    # 2. 状态字段
    status = models.SmallIntegerField(default=1, help_text="0:Inactive, 1:Active, 2:Banned")
    role = models.CharField(max_length=10, default='member')

    # 3. 数值字段
    age = models.PositiveSmallIntegerField(null=True)
    balance = models.DecimalField(max_digits=19, decimal_places=4, default=0.0000)
    score = models.IntegerField(default=0)

    # 4. 时间字段
    # 使用 auto_now_add=True 让 Django 自动处理创建时间
    created_at = models.DateTimeField(auto_now_add=True) 
    last_login = models.DateTimeField(null=True, blank=True)

    # 5. 大文本/复杂字段
    bio = models.TextField(null=True, blank=True)
    
    # 6. JSON 字段
    extras = models.JSONField(default=dict, null=True, blank=True)

    class Meta:
        db_table = 'user_demo'

迁移

python manage.py makemigration
python manage.py migrate

2 创建调优

使用bulk_create创建1万条数据

import os
import sys
import random
import string
import django

# 1. 设置 Django 环境
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR)
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mysite.settings")
django.setup()

from main.models import UserInfo

def generate_random_string(length=10):
    """生成随机字符串"""
    letters = string.ascii_lowercase
    return ''.join(random.choice(letters) for i in range(length))

def create_demo_data(count=100):
    print(f"开始生成 {count} 条数据...")
    
    users = []
    # 预先定义好 choices,模拟真实数据分布
    ROLES = ['admin', 'editor', 'member', 'guest']
    
    for i in range(count):
        # 确保 username 唯一,这里简单加上索引 i
        username = f"user_{generate_random_string(5)}_{i}"
        
        user = UserInfo(
            username=username,
            email=f"{username}@example.com",
            status=random.choice([0, 1, 1, 1, 2]), # 1 的概率大一些
            role=random.choice(ROLES),
            age=random.randint(18, 80),
            balance=random.uniform(0, 100000),
            score=random.randint(0, 500000),
            bio=generate_random_string(500),
            extras={'preference': random.choice(['light', 'dark']), 'level': random.randint(1, 10)}
        )
        users.append(user)

    # bulk_create 一次性插入,性能更好
    UserInfo.objects.bulk_create(users)
    print(f"✅ 成功插入 {len(users)} 条数据到 UserInfo 表。")

if __name__ == "__main__":
    try:
        create_demo_data(1000000)
    except Exception as e:
        print(f"❌ 发生错误: {e}")

创建成功

mysql> select count(*) from user_demo;
+----------+
| count(*) |
+----------+
|    10000 |
+----------+
1 row in set (0.002 sec)

构建10万条失败,失败信息为:

开始生成 1000000 条数据...
❌ 发生错误: (2013, 'Lost connection to MySQL server during query')

采取分批插入的方式来解决问题

import os
import sys
import random
import string
import django

# 1. 设置 Django 环境
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR)
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mysite.settings")
django.setup()

from main.models import UserInfo

def generate_random_string(length=10):
    """生成随机字符串"""
    letters = string.ascii_lowercase
    return ''.join(random.choice(letters) for i in range(length))

def create_demo_data(count=100):
    print(f"开始生成 {count} 条数据...")
    
    users = []
    BATCH_SIZE = 5000  # 每次插入 5000 条
    # 预先定义好 choices,模拟真实数据分布
    ROLES = ['admin', 'editor', 'member', 'guest']
    batch_data = []
    for i in range(count):
        # 确保 username 唯一,这里简单加上索引 i
        username = f"user_{generate_random_string(5)}_{i}"
        
        user = UserInfo(
            username=username,
            email=f"{username}@example.com",
            status=random.choice([0, 1, 1, 1, 2]), # 1 的概率大一些
            role=random.choice(ROLES),
            age=random.randint(18, 80),
            balance=random.uniform(0, 100000),
            score=random.randint(0, 500000),
            bio=generate_random_string(500),
            extras={'preference': random.choice(['light', 'dark']), 'level': random.randint(1, 10)}
        )

        batch_data.append(user)
        # 当积攒够了 BATCH_SIZE,就插入一次
        if len(batch_data) >= BATCH_SIZE:
            UserInfo.objects.bulk_create(batch_data, batch_size=BATCH_SIZE)
            print(f"已插入 {i + 1} 条...")
            batch_data = [] # 清空列表

    # 插入剩余的数据(如果有的话)
    if batch_data:
        UserInfo.objects.bulk_create(batch_data)
        print(f"已插入剩余 {len(batch_data)} 条...")
    print(f"✅ 成功插入 {len(users)} 条数据到 UserInfo 表。")

if __name__ == "__main__":
    try:
        create_demo_data(1000000)
    except Exception as e:
        print(f"❌ 发生错误: {e}")

3 查找优化

这种查找,性能非常操作,因为本质上是在做:SELECT * FROM table ...

from main.models import UserInfo

users = UserInfo.objects.all()
users_usernames = [user.username for user in users]

改成

from main.models import UserInfo

users_usernames = list(UserInfo.objects.values_list('username', flat=True))

到此这篇关于浅谈django的db性能调优的文章就介绍到这了,更多相关django db性能调优内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文