python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python数据库操作库

Python中数据库操作库实战示例

作者:小小测试开发

本文将详细介绍Python中最主流、最实用的数据库操作库,涵盖关系型数据库(MySQL、PostgreSQL、SQLite)和非关系型数据库(MongoDB、Redis)对应的操作库,感兴趣的朋友跟随小编一起看看吧

在Python开发中,数据库操作是高频需求,无论是Web开发、数据分析、自动化测试,还是后台服务,都需要通过代码与数据库交互(增删改查、连接管理等)。Python生态中提供了丰富的数据库操作库,不同库适配不同数据库类型、场景需求,各有优劣。

本文将详细介绍Python中最主流、最实用的数据库操作库,涵盖关系型数据库(MySQL、PostgreSQL、SQLite)和非关系型数据库(MongoDB、Redis)对应的操作库,每个库均包含「库介绍→安装步骤→核心用法→实战示例→注意事项」,兼顾初学者入门和开发者实战复用,全程贴合Markdown排版,结构清晰、内容详实。

注:所有示例均基于Python 3.8+版本,代码可直接复制运行(需提前安装对应库、配置数据库环境);重点讲解“通用操作逻辑”,避免冗余,同时补充生产环境必备的异常处理、连接池等技巧。

一、前言:Python数据库操作的核心逻辑

无论使用哪种数据库、哪个操作库,Python与数据库交互的核心逻辑一致,可概括为4步,统一梳理如下,后续所有库的示例均遵循此逻辑:

补充:生产环境中,不建议频繁创建/关闭连接(耗时且耗资源),优先使用「连接池」管理连接;同时需添加异常处理,避免因数据库异常(如连接失败、SQL错误)导致程序崩溃。

二、关系型数据库操作库(重点)

关系型数据库(MySQL、PostgreSQL、SQLite)是最常用的数据库类型,特点是数据结构化、支持SQL语句、事务安全。Python中对应操作库均遵循“连接→操作→关闭”的逻辑,以下重点讲解3个主流库。

1. MySQLdb / PyMySQL(适配MySQL数据库)

MySQL是Python开发中最常用的关系型数据库,对应的操作库有两个:MySQLdbPyMySQL,两者功能一致、用法几乎相同,核心区别在于:MySQLdb仅支持Python 2.x,且仅支持Windows系统;PyMySQL支持Python 3.x,跨平台(Windows、Linux、Mac),是目前的首选。

本文重点讲解 PyMySQL(主流、通用),替代过时的MySQLdb。

(1)库介绍

(2)安装步骤

使用pip命令安装,直接执行以下命令(无需额外依赖):

# 安装PyMySQL(推荐)
pip install pymysql
# 若需安装MySQLdb(仅Python 2.x,不推荐)
# pip install mysqldb

(3)核心用法与实战示例

示例场景:连接本地MySQL数据库(数据库名:test_db,用户名:root,密码:123456),实现「建表→插入数据→查询数据→更新数据→删除数据」的完整操作,包含异常处理和连接关闭。

import pymysql
from pymysql import Error
def mysql_demo():
    # 1. 数据库连接参数(根据实际环境修改)
    db_config = {
        "host": "localhost",  # 数据库地址(本地为localhost)
        "port": 3306,         # 数据库端口(MySQL默认3306)
        "user": "root",       # 用户名
        "password": "123456", # 密码
        "database": "test_db" # 要连接的数据库名
    }
    # 初始化连接和游标
    connection = None
    cursor = None
    try:
        # 2. 建立数据库连接
        connection = pymysql.connect(**db_config)
        if connection.open:
            print("✅ 成功连接到MySQL数据库")
        # 3. 创建游标对象(用于执行SQL语句)
        cursor = connection.cursor()
        # 4. 执行操作(建表→插入→查询→更新→删除)
        # 4.1 建表(若表不存在)
        create_table_sql = """
        CREATE TABLE IF NOT EXISTS user_info (
            id INT AUTO_INCREMENT PRIMARY KEY,  # 自增主键
            name VARCHAR(50) NOT NULL,         # 姓名(非空)
            age INT,                           # 年龄
            gender VARCHAR(10),                # 性别
            create_time DATETIME DEFAULT CURRENT_TIMESTAMP  # 创建时间(默认当前时间)
        ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
        """
        cursor.execute(create_table_sql)
        print("✅ 表创建成功(若不存在)")
        # 4.2 插入数据(单条插入)
        insert_sql = "INSERT INTO user_info (name, age, gender) VALUES (%s, %s, %s)"
        insert_data = ("张三", 25, "男")  # 占位符%s,避免SQL注入(重点!)
        cursor.execute(insert_sql, insert_data)
        connection.commit()  # 插入/更新/删除操作需提交事务
        print(f"✅ 插入数据成功,影响行数:{cursor.rowcount}")
        # 4.3 批量插入数据(效率高于单条插入)
        batch_insert_sql = "INSERT INTO user_info (name, age, gender) VALUES (%s, %s, %s)"
        batch_data = [("李四", 28, "男"), ("王五", 23, "女"), ("赵六", 30, "男")]
        cursor.executemany(batch_insert_sql, batch_data)
        connection.commit()
        print(f"✅ 批量插入数据成功,影响行数:{cursor.rowcount}")
        # 4.4 查询数据(单条查询+多条查询)
        # 单条查询(根据id查询)
        select_one_sql = "SELECT * FROM user_info WHERE id = %s"
        cursor.execute(select_one_sql, (1,))  # 注意:参数需是元组,即使只有一个值
        result_one = cursor.fetchone()  # 获取单条结果(返回元组,无结果返回None)
        print(f"✅ 单条查询结果:{result_one}")
        # 多条查询(查询所有数据)
        select_all_sql = "SELECT * FROM user_info"
        cursor.execute(select_all_sql)
        result_all = cursor.fetchall()  # 获取所有结果(返回列表,每个元素是元组)
        print(f"✅ 所有查询结果(共{len(result_all)}条):")
        for row in result_all:
            print(f"id: {row[0]}, 姓名: {row[1]}, 年龄: {row[2]}, 性别: {row[3]}, 创建时间: {row[4]}")
        # 4.5 更新数据
        update_sql = "UPDATE user_info SET age = %s WHERE name = %s"
        update_data = (26, "张三")
        cursor.execute(update_sql, update_data)
        connection.commit()
        print(f"✅ 更新数据成功,影响行数:{cursor.rowcount}")
        # 4.6 删除数据
        delete_sql = "DELETE FROM user_info WHERE name = %s"
        delete_data = ("赵六",)
        cursor.execute(delete_sql, delete_data)
        connection.commit()
        print(f"✅ 删除数据成功,影响行数:{cursor.rowcount}")
    except Error as e:
        # 异常处理(捕获连接、SQL执行等异常)
        print(f"❌ 数据库操作异常:{e}")
        # 若有事务未提交,回滚(避免数据错乱)
        if connection:
            connection.rollback()
    finally:
        # 5. 关闭资源(先关游标,再关连接,顺序不可颠倒)
        if cursor:
            cursor.close()
            print("✅ 游标已关闭")
        if connection and connection.open:
            connection.close()
            print("✅ 数据库连接已关闭")
# 执行示例
if __name__ == "__main__":
    mysql_demo()

(4)关键注意事项

2. sqlite3(适配SQLite数据库,Python内置)

SQLite是一款「轻量级嵌入式关系型数据库」,无需单独安装数据库服务,数据存储在单个文件中,适合小型项目、本地测试、嵌入式开发等场景。Python内置 sqlite3 库,无需额外安装,可直接使用。

(1)库介绍

(2)安装步骤

无需安装!Python 3.0+版本默认内置,直接导入 sqlite3 即可使用。

(3)核心用法与实战示例

示例场景:创建本地SQLite数据库文件(test_sqlite.db),实现「建表→插入→查询→更新→删除」完整操作,贴合SQLite轻量特性,补充异常处理。

import sqlite3
from sqlite3 import Error
def sqlite_demo():
    # 1. 数据库连接参数(SQLite无需用户名密码,仅需指定数据库文件路径)
    db_path = "test_sqlite.db"  # 数据库文件存储路径(不存在则自动创建)
    # 初始化连接和游标
    connection = None
    cursor = None
    try:
        # 2. 建立数据库连接(SQLite连接是文件连接,不存在则创建文件)
        connection = sqlite3.connect(db_path)
        print("✅ 成功连接到SQLite数据库(文件:test_sqlite.db)")
        # 3. 创建游标对象
        cursor = connection.cursor()
        # 4. 执行操作(建表→插入→查询→更新→删除)
        # 4.1 建表(若表不存在)
        create_table_sql = """
        CREATE TABLE IF NOT EXISTS book_info (
            id INTEGER PRIMARY KEY AUTOINCREMENT,  # SQLite自增用INTEGER,而非INT
            book_name TEXT NOT NULL,               # 书名(非空)
            author TEXT,                           # 作者
            price REAL,                            # 价格(浮点型)
            publish_time TEXT                      # 出版时间(字符串格式)
        );
        """
        cursor.execute(create_table_sql)
        print("✅ 表创建成功(若不存在)")
        # 4.2 插入数据(单条+批量)
        # 单条插入
        insert_sql = "INSERT INTO book_info (book_name, author, price, publish_time) VALUES (?, ?, ?, ?)"
        # SQLite占位符用?,而非%s,注意区别!
        insert_data = ("Python编程:从入门到精通", "张三", 89.9, "2023-01-01")
        cursor.execute(insert_sql, insert_data)
        # 批量插入
        batch_data = [
            ("Java编程思想", "李四", 99.9, "2022-05-01"),
            ("SQL必知必会", "王五", 59.9, "2023-03-15"),
            ("Redis实战", "赵六", 79.9, "2023-06-20")
        ]
        cursor.executemany(insert_sql, batch_data)
        connection.commit()  # SQLite同样需要提交事务
        print(f"✅ 插入数据成功,影响行数:{cursor.rowcount}")
        # 4.3 查询数据
        # 单条查询(根据id)
        select_one_sql = "SELECT * FROM book_info WHERE id = ?"
        cursor.execute(select_one_sql, (1,))
        result_one = cursor.fetchone()
        print(f"✅ 单条查询结果:{result_one}")
        # 多条查询(查询价格>60的书籍)
        select_condition_sql = "SELECT book_name, author, price FROM book_info WHERE price > ?"
        cursor.execute(select_condition_sql, (60,))
        result_condition = cursor.fetchall()
        print(f"✅ 价格>60的书籍(共{len(result_condition)}条):")
        for row in result_condition:
            print(f"书名: {row[0]}, 作者: {row[1]}, 价格: {row[2]}元")
        # 4.4 更新数据
        update_sql = "UPDATE book_info SET price = ? WHERE book_name = ?"
        update_data = (79.9, "Python编程:从入门到精通")
        cursor.execute(update_sql, update_data)
        connection.commit()
        print(f"✅ 更新数据成功,影响行数:{cursor.rowcount}")
        # 4.5 删除数据
        delete_sql = "DELETE FROM book_info WHERE author = ?"
        delete_data = ("赵六",)
        cursor.execute(delete_sql, delete_data)
        connection.commit()
        print(f"✅ 删除数据成功,影响行数:{cursor.rowcount}")
    except Error as e:
        print(f"❌ 数据库操作异常:{e}")
        if connection:
            connection.rollback()
    finally:
        # 关闭资源
        if cursor:
            cursor.close()
            print("✅ 游标已关闭")
        if connection:
            connection.close()
            print("✅ 数据库连接已关闭")
# 执行示例
if __name__ == "__main__":
    sqlite_demo()

(4)关键注意事项

3. psycopg2(适配PostgreSQL数据库)

PostgreSQL是一款功能强大、开源免费的关系型数据库,支持复杂查询、JSON数据类型、事务安全,适合中大型项目、数据分析场景。Python中操作PostgreSQL的主流库是 psycopg2(依赖C扩展,效率高)和 psycopg2-binary(二进制版本,安装简单,无需编译)。

(1)库介绍

(2)安装步骤

推荐安装二进制版本(无需编译,跨平台),执行以下命令:

# 安装psycopg2二进制版本(推荐)
pip install psycopg2-binary
# 安装编译版本(需提前安装PostgreSQL开发环境,不推荐)
# pip install psycopg2

(3)核心用法与实战示例

示例场景:连接本地PostgreSQL数据库(数据库名:test_pg,用户名:postgres,密码:123456),实现核心操作,用法与PyMySQL类似,重点突出差异点。

import psycopg2
from psycopg2 import Error
def postgresql_demo():
    # 1. 数据库连接参数
    db_config = {
        "host": "localhost",
        "port": 5432,          # PostgreSQL默认端口5432(区别于MySQL的3306)
        "user": "postgres",    # PostgreSQL默认用户名postgres
        "password": "123456",
        "database": "test_pg"
    }
    connection = None
    cursor = None
    try:
        # 2. 建立连接
        connection = psycopg2.connect(**db_config)
        cursor = connection.cursor()
        print("✅ 成功连接到PostgreSQL数据库")
        # 3. 建表(支持JSON类型,PostgreSQL特色)
        create_table_sql = """
        CREATE TABLE IF NOT EXISTS student_info (
            id SERIAL PRIMARY KEY,  # PostgreSQL自增用SERIAL
            name VARCHAR(50) NOT NULL,
            age INT,
            info JSONB  # 支持JSONB类型(高效存储JSON数据)
        );
        """
        cursor.execute(create_table_sql)
        print("✅ 表创建成功(若不存在)")
        # 4. 插入数据(支持JSON类型数据)
        insert_sql = "INSERT INTO student_info (name, age, info) VALUES (%s, %s, %s)"
        # PostgreSQL占位符同样是%s,与PyMySQL一致
        insert_data = ("张三", 20, '{"class": "一班", "score": 90}')  # JSON数据传字符串
        cursor.execute(insert_sql, insert_data)
        # 批量插入
        batch_data = [
            ("李四", 21, '{"class": "二班", "score": 85}'),
            ("王五", 19, '{"class": "一班", "score": 95}')
        ]
        cursor.executemany(insert_sql, batch_data)
        connection.commit()
        print(f"✅ 插入数据成功,影响行数:{cursor.rowcount}")
        # 5. 查询数据(支持JSON字段查询,PostgreSQL特色)
        select_sql = "SELECT * FROM student_info WHERE info->>'class' = %s"
        cursor.execute(select_sql, ("一班",))  # 查询class为一班的学生
        result = cursor.fetchall()
        print(f"✅ 一班学生(共{len(result)}条):")
        for row in result:
            print(f"id: {row[0]}, 姓名: {row[1]}, 年龄: {row[2]}, 信息: {row[3]}")
        # 6. 更新、删除操作(与PyMySQL一致)
        update_sql = "UPDATE student_info SET age = %s WHERE name = %s"
        cursor.execute(update_sql, (22, "张三"))
        connection.commit()
        delete_sql = "DELETE FROM student_info WHERE name = %s"
        cursor.execute(delete_sql, ("李四",))
        connection.commit()
    except Error as e:
        print(f"❌ 数据库操作异常:{e}")
        if connection:
            connection.rollback()
    finally:
        # 关闭资源
        if cursor:
            cursor.close()
        if connection:
            connection.close()
            print("✅ 数据库连接已关闭")
if __name__ == "__main__":
    postgresql_demo()

(4)关键注意事项

三、非关系型数据库操作库(重点)

非关系型数据库(NoSQL)无需固定表结构,支持灵活的数据存储(如JSON、键值对),适合大数据量、高并发、数据结构不固定的场景。以下讲解Python中最常用的两个NoSQL数据库操作库:MongoDB(文档型)和Redis(键值对型)。

1. pymongo(适配MongoDB数据库)

MongoDB是一款主流的文档型NoSQL数据库,数据以JSON格式(BSON)存储,无需固定表结构,灵活易用,适合Web开发、日志存储、数据分析等场景。Python中操作MongoDB的主流库是 pymongo(MongoDB官方推荐)。

(1)库介绍

(2)安装步骤

# 安装pymongo
pip install pymongo
# 若需支持异步操作(如FastAPI项目),安装motor(pymongo的异步版本)
# pip install motor

(3)核心用法与实战示例

MongoDB核心概念(与关系型数据库对应):数据库(Database)→ 集合(Collection,对应表)→ 文档(Document,对应行),无需提前建库、建集合(插入文档时自动创建)。

示例场景:连接本地MongoDB数据库(默认端口27017,无用户名密码),实现「插入文档→查询文档→更新文档→删除文档」完整操作。

from pymongo import MongoClient
from pymongo.errors import PyMongoError
def mongodb_demo():
    # 1. 数据库连接参数
    mongo_url = "mongodb://localhost:27017/"  # MongoDB默认地址+端口
    db_name = "test_mongo"  # 数据库名(不存在则自动创建)
    collection_name = "user"  # 集合名(对应关系型数据库的表,不存在则自动创建)
    client = None
    try:
        # 2. 建立连接(创建MongoClient对象)
        client = MongoClient(mongo_url)
        print("✅ 成功连接到MongoDB数据库")
        # 3. 获取数据库对象
        db = client[db_name]
        # 4. 获取集合对象(对应表)
        collection = db[collection_name]
        # 5. 执行操作(插入→查询→更新→删除)
        # 5.1 插入文档(单条+批量)
        # 单条插入(文档是字典格式,对应JSON)
        single_doc = {
            "name": "张三",
            "age": 25,
            "gender": "男",
            "info": {"city": "北京", "job": "程序员"}
        }
        insert_result = collection.insert_one(single_doc)
        print(f"✅ 单条插入成功,文档ID:{insert_result.inserted_id}")
        # 批量插入(传入列表,每个元素是字典)
        batch_docs = [
            {"name": "李四", "age": 28, "gender": "男", "info": {"city": "上海", "job": "产品经理"}},
            {"name": "王五", "age": 23, "gender": "女", "info": {"city": "广州", "job": "设计师"}},
            {"name": "赵六", "age": 30, "gender": "男", "info": {"city": "深圳", "job": "测试工程师"}}
        ]
        batch_result = collection.insert_many(batch_docs)
        print(f"✅ 批量插入成功,文档IDs:{batch_result.inserted_ids}")
        # 5.2 查询文档(单条+多条+条件查询)
        # 单条查询(根据条件查询,find_one返回第一个匹配文档)
        single_query = collection.find_one({"name": "张三"})
        print(f"✅ 单条查询结果:{single_query}")
        # 多条查询(查询所有gender为男的文档,返回游标,需遍历)
        multi_query = collection.find({"gender": "男"})
        print("✅ 所有男性用户:")
        for doc in multi_query:
            print(doc)
        # 条件查询(查询age>25的文档,只返回name和age字段,隐藏_id)
        condition_query = collection.find(
            {"age": {"$gt": 25}},  # 条件:age>25(MongoDB条件语法)
            {"name": 1, "age": 1, "_id": 0}  # 1:显示,0:隐藏
        )
        print("✅ 年龄>25的用户(仅显示姓名和年龄):")
        for doc in condition_query:
            print(doc)
        # 5.3 更新文档(单条+多条)
        # 单条更新(更新name为张三的文档,将age改为26)
        update_single = collection.update_one(
            {"name": "张三"},  # 查询条件
            {"$set": {"age": 26, "info.city": "杭州"}}  # 更新操作:$set表示修改指定字段
        )
        print(f"✅ 单条更新成功,匹配行数:{update_single.matched_count},修改行数:{update_single.modified_count}")
        # 多条更新(更新所有gender为男的文档,将age加1)
        update_multi = collection.update_many(
            {"gender": "男"},
            {"$inc": {"age": 1}}  # $inc表示自增
        )
        print(f"✅ 多条更新成功,匹配行数:{update_multi.matched_count},修改行数:{update_multi.modified_count}")
        # 5.4 删除文档(单条+多条)
        # 单条删除(删除name为赵六的文档)
        delete_single = collection.delete_one({"name": "赵六"})
        print(f"✅ 单条删除成功,删除行数:{delete_single.deleted_count}")
        # 多条删除(删除age>30的文档)
        delete_multi = collection.delete_many({"age": {"$gt": 30}})
        print(f"✅ 多条删除成功,删除行数:{delete_multi.deleted_count}")
    except PyMongoError as e:
        print(f"❌ MongoDB操作异常:{e}")
    finally:
        # 关闭连接
        if client:
            client.close()
            print("✅ MongoDB连接已关闭")
if __name__ == "__main__":
    mongodb_demo()

(4)关键注意事项

2. redis-py(适配Redis数据库)

Redis是一款高性能的键值对型NoSQL数据库,数据存储在内存中,支持多种数据结构(字符串、列表、哈希、集合、有序集合),适合缓存、会话存储、计数器、消息队列等场景。Python中操作Redis的主流库是redis-py(Redis官方推荐)。

(1)库介绍

(2)安装步骤

# 安装redis-py(推荐最新版本)
pip install redis
# 若需异步操作(如异步Web项目),安装aioredis
# pip install aioredis

(3)核心用法与实战示例

Redis核心是「键值对」,支持多种数据结构,每种数据结构有对应的操作方法。示例场景:连接本地Redis数据库(默认端口6379,无用户名密码),演示常用数据结构的核心操作。

import redis
from redis.exceptions import RedisError
def redis_demo():
    # 1. 数据库连接参数
    redis_config = {
        "host": "localhost",
        "port": 6379,          # Redis默认端口6379
        "password": "",        # 若有密码,传入密码字符串
        "db": 0,               # 选择数据库(Redis默认有16个数据库,编号0-15)
        "decode_responses": True  # 关键:设置为True,返回字符串(默认返回bytes类型)
    }
    r = None
    try:
        # 2. 建立连接(创建Redis对象,推荐使用连接池)
        # 方式1:直接创建连接(简单场景)
        r = redis.Redis(**redis_config)
        # 方式2:使用连接池(生产环境推荐,避免频繁创建/关闭连接)
        # pool = redis.ConnectionPool(**redis_config)
        # r = redis.Redis(connection_pool=pool)
        # 验证连接(ping()返回True表示连接成功)
        if r.ping():
            print("✅ 成功连接到Redis数据库")
        # 3. 执行操作(常用数据结构:字符串、哈希、列表、集合)
        # 3.1 字符串类型(String):最基础的键值对,适合缓存、计数器
        # 设置值(key=name, value=张三,过期时间60秒)
        r.set("name", "张三", ex=60)
        # 获取值
        name = r.get("name")
        print(f"✅ 字符串查询:name = {name}")
        # 自增(计数器)
        r.incr("count")  # count从0变为1
        r.incr("count", 2)  # count从1变为3
        count = r.get("count")
        print(f"✅ 计数器:count = {count}")
        # 删除值
        r.delete("count")
        print(f"✅ 字符串删除:count是否存在 = {r.exists('count')}")  # exists返回0表示不存在
        # 3.2 哈希类型(Hash):适合存储对象(如用户信息),键值对嵌套
        # 设置哈希值(key=user:1,字段name、age、gender)
        r.hset("user:1", mapping={
            "name": "李四",
            "age": 28,
            "gender": "男"
        })
        # 获取单个字段值
        user_name = r.hget("user:1", "name")
        print(f"✅ 哈希查询(单个字段):user:1.name = {user_name}")
        # 获取所有字段和值
        user_all = r.hgetall("user:1")
        print(f"✅ 哈希查询(所有字段):user:1 = {user_all}")
        # 获取所有字段名
        user_fields = r.hkeys("user:1")
        print(f"✅ 哈希查询(所有字段名):user:1.fields = {user_fields}")
        # 3.3 列表类型(List):有序、可重复,适合消息队列、排行榜
        # 向列表左侧添加值
        r.lpush("book_list", "Python编程", "Java编程", "SQL必知必会")
        # 向列表右侧添加值
        r.rpush("book_list", "Redis实战")
        # 获取列表长度
        book_len = r.llen("book_list")
        print(f"✅ 列表长度:book_list.length = {book_len}")
        # 获取列表所有值(0表示第一个,-1表示最后一个)
        book_all = r.lrange("book_list", 0, -1)
        print(f"✅ 列表查询(所有值):book_list = {book_all}")
        # 弹出列表左侧值(消息队列常用)
        book_pop = r.lpop("book_list")
        print(f"✅ 列表弹出(左侧):弹出值 = {book_pop},剩余列表 = {r.lrange('book_list', 0, -1)}")
        # 3.4 集合类型(Set):无序、不可重复,适合去重、交集/并集查询
        # 向集合添加值
        r.sadd("user_set", "张三", "李四", "王五", "张三")  # 重复值会自动去重
        # 获取集合所有值
        user_set_all = r.smembers("user_set")
        print(f"✅ 集合查询(所有值):user_set = {user_set_all}")
        # 判断值是否在集合中
        is_exist = r.sismember("user_set", "张三")
        print(f"✅ 集合判断:张三是否在user_set中 = {is_exist}")
        # 获取集合长度
        user_set_len = r.scard("user_set")
        print(f"✅ 集合长度:user_set.length = {user_set_len}")
        # 3.5 事务操作(Redis事务:批量执行,要么全部成功,要么全部失败)
        pipe = r.pipeline()  # 创建管道(事务载体)
        pipe.set("tx1", "test1")
        pipe.set("tx2", "test2")
        pipe.incr("tx3")
        pipe.execute()  # 执行事务
        print(f"✅ 事务执行成功:tx1 = {r.get('tx1')}, tx2 = {r.get('tx2')}, tx3 = {r.get('tx3')}")
    except RedisError as e:
        print(f"❌ Redis操作异常:{e}")
    finally:
        # 关闭连接(使用连接池时,无需手动关闭,连接池会自动管理)
        if r:
            r.close()
            print("✅ Redis连接已关闭")
if __name__ == "__main__":
    redis_demo()

(4)关键注意事项

四、通用进阶技巧(生产环境必备)

无论使用哪种数据库、哪种操作库,以下进阶技巧都是生产环境必备,可提升脚本稳定性、效率和安全性,统一梳理如下:

1. 异常处理全覆盖

必须捕获所有可能的异常(如连接失败、SQL错误、Redis超时等),避免程序崩溃;同时在异常时做兜底处理(如事务回滚、日志记录),便于问题排查。

2. 连接池管理

生产环境中,频繁创建/关闭数据库连接会严重影响效率,所有库都支持连接池(PyMySQL用DBUtils,pymongo用MongoClient内置连接池,redis用ConnectionPool),需合理配置连接池参数(最大连接数、超时时间等)。

3. 避免SQL注入(关系型数据库)

所有关系型数据库(MySQL、PostgreSQL、SQLite)的SQL语句,必须使用占位符传递参数,不可直接拼接SQL字符串,避免SQL注入漏洞(重点!)。

4. 日志记录

在数据库操作的关键节点(连接、执行SQL、异常)添加日志记录(使用Python内置logging库),便于后续排查问题(如定位SQL执行失败的原因、连接超时的场景)。

5. 数据序列化(非关系型数据库)

MongoDB、Redis存储复杂数据(字典、列表)时,需先序列化(如json.dumps()),读取时反序列化,确保数据完整性。

6. 批量操作优先

插入、更新、删除大量数据时,优先使用批量操作(如executemany、insert_many、hmset),避免单条循环操作,提升执行效率(批量操作可减少网络交互次数)。

7. 权限与安全

生产环境中,数据库必须设置密码、限制访问IP;敏感数据(如密码、数据库地址)不可硬编码在代码中,需使用配置文件(如config.ini)或环境变量存储。

五、总结

Python中数据库操作库丰富多样,核心可分为「关系型数据库库」和「非关系型数据库库」,选择时需遵循「场景适配」原则,总结如下:

1. 库选型建议

2. 核心要点

Python数据库操作库的核心是「简化与数据库的交互」,无需深入理解数据库底层原理,只需掌握对应库的API和选型原则,就能高效完成数据库操作。后续可根据具体项目场景,深入学习某一个库的高级特性(如MongoDB聚合查询、Redis管道、PostgreSQL JSON操作),进一步提升开发效率。

到此这篇关于Python中数据库操作库详解的文章就介绍到这了,更多相关Python数据库操作库内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文