python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > PyMySQL数据库连接与优化

PyMySQL数据库连接与优化方式

作者:AI手记叨叨礼拜天

本文介绍PyMySQL连接MySQL数据库的方法,涵盖CRUD操作、事务处理及连接池优化,通过合理配置连接池和错误重试机制,提升性能并确保数据一致性

本文将介绍如何使用 PyMySQL 连接和操作 MySQL 数据库,包括基本连接CRUD 操作事务处理以及如何在高并发环境下使用连接池优化性能。

通过合理的连接池配置和错误处理机制,可以构建出稳定高效的数据库应用。

一、PyMySQL 简介

PyMySQL 是一个纯 Python 实现的 MySQL 客户端库,用于连接和操作 MySQL 数据库。它完全兼容 Python DB API 2.0 规范,提供了简单易用的接口来执行 SQL 查询和操作。

核心优势

安装方法

pip install pymysql

二、数据库连接配置

基础连接方式

import pymysql
from pymysql.cursors import DictCursor

# 推荐配置方式
def create_connection():
    return pymysql.connect(
        host='localhost',      # 数据库地址
        user='username',       # 用户名
        password='password',   # 密码
        database='test_db',    # 数据库名
        port=3306,            # 端口,默认3306
        charset='utf8mb4',     # 字符集,推荐utf8mb4
        autocommit=False,     # 是否自动提交
        cursorclass=DictCursor # 返回字典格式结果
    )
	

连接参数说明

参数说明
host数据库服务器地址‘localhost’
user用户名根据实际配置
password密码根据实际配置
database数据库名称项目数据库名
charset字符编码‘utf8mb4’(支持表情符号)
autocommit自动提交事务False(建议手动控制)
cursorclass游标类型DictCursor(结果以字典返回)

cursorclass参数说明

cursorclass说明返回结果格式适用场景
Cursor (默认)普通游标元组格式 (value1, value2, …)基础查询,需要最高性能时
DictCursor字典游标字典格式 {‘column’: value}需要按列名访问数据时
SSCursor无缓冲游标元组格式,流式读取处理大量数据,内存有限时
SSDictCursor无缓冲字典游标字典格式,流式读取大量数据且需要按列名访问
Cursor 子类自定义游标自定义格式特殊数据处理需求

完整连接示例

import pymysql
from pymysql.cursors import DictCursor

def get_db_connection():
    """获取数据库连接"""
    return pymysql.connect(
        host='localhost',
        user='myuser',
        password='mypassword',
        database='mydatabase',
        charset='utf8mb4',
        autocommit=False,
        cursorclass=DictCursor,
        connect_timeout=10  # 连接超时10秒
    )

# 使用示例
def test_connection():
    conn = get_db_connection()
    try:
        with conn.cursor() as cursor:
            cursor.execute("SELECT 1 as test")
            result = cursor.fetchone()
            print("连接测试成功:", result)
    finally:
        conn.close()

test_connection()

输出:

连接测试成功: {'test': 1}

三、数据库基础操作

创建示例数据表

CREATE TABLE mydb.users (
    id INT AUTO_INCREMENT PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    email VARCHAR(100) UNIQUE NOT NULL,
    age INT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

相关说明

关键字类型说明
INT数据类型整数类型,用于存储整数值
AUTO_INCREMENT约束/属性自动递增,每次插入新记录时自动生成唯一ID
PRIMARY KEY约束主键,唯一标识每条记录
VARCHAR(100)数据类型可变长度字符串,最大100字符
NOT NULL约束该字段不能为空,必须包含值
UNIQUE约束确保每个值唯一,不允许重复
TIMESTAMP数据类型时间戳类型,用于存储日期和时间
DEFAULT CURRENT_TIMESTAMP默认值默认值为当前系统时间

数据库操作封装类

import pymysql
from pymysql.cursors import DictCursor
from typing import List, Dict, Any, Optional, Tuple

class MySQLManager:
    """MySQL 数据库管理类"""

    def __init__(self, config: Dict[str, Any]):
        self.config = config

    def execute_query(self, sql: str, params: Tuple = None) -> List[Dict]:
        """执行查询语句(SELECT)"""
        conn = pymysql.connect(**self.config)
        try:
            with conn.cursor(DictCursor) as cursor:
                cursor.execute(sql, params or ())
                return cursor.fetchall()
        finally:
            conn.close()

    def execute_update(self, sql: str, params: Tuple = None) -> int:
        """执行更新语句(INSERT/UPDATE/DELETE)"""
        conn = pymysql.connect(**self.config)
        try:
            with conn.cursor() as cursor:
                affected_rows = cursor.execute(sql, params or ())
                conn.commit()
                return affected_rows
        except Exception as e:
            conn.rollback()
            raise e
        finally:
            conn.close()			

CRUD 操作示例

操作英文中文对应 SQL描述
CCreate创建INSERT创建新记录
RRead读取SELECT查询/读取数据
UUpdate更新UPDATE修改现有记录
DDelete删除DELETE删除记录
# 数据库配置
db_config = {
    'host': 'localhost',
    'user': 'root',
    'password': 'password',
    'database': 'test_db',
    'charset': 'utf8mb4',
    'cursorclass': DictCursor 
}

db = MySQLManager(db_config)

# 1. 插入数据
def add_user(name: str, email: str, age: int) -> int:
    sql = "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)"
    return db.execute_update(sql, (name, email, age))

# 2. 查询数据
def get_all_users() -> List[Dict]:
    return db.execute_query("SELECT * FROM users")

# 3. 更新数据
def update_user_email(user_id: int, new_email: str) -> int:
    sql = "UPDATE users SET email = %s WHERE id = %s"
    return db.execute_update(sql, (new_email, user_id))

# 4. 删除数据
def delete_user(user_id: int) -> int:
    return db.execute_update("DELETE FROM users WHERE id = %s", (user_id,))
	
if __name__ == '__main__':
    users = get_all_users()
    print(f"查询所有用户: {users}")
	
    user_id = add_user("张三", "zhangsan@example.com", 25)
    print(f"执行:插入新用户")
	
    users = get_all_users()
    print(f"查询所有用户: {users}")
	
    user_id = users[0]['id']
    affected_rows = update_user_email(user_id, "zhangsan2@example.com")
    print(f"执行:更新邮箱,影响行数: {affected_rows}")
	
    users = get_all_users()
    print(f"查询所有用户: {users}")
	
    affected_rows = delete_user(user_id)
    print(f"执行:删除用户,影响行数: {affected_rows}")
	
    users = get_all_users()
    print(f"查询所有用户: {users}")

输出:

查询所有用户: ()
执行:插入新用户
查询所有用户: [{'id': 3, 'name': '张三', 'email': 'zhangsan@example.com', 'age': 25, 'created_at': datetime.datetime(2025, 9, 23, 19, 25, 11)}]
执行:更新邮箱,影响行数: 1
查询所有用户: [{'id': 3, 'name': '张三', 'email': 'zhangsan2@example.com', 'age': 25, 'created_at': datetime.datetime(2025, 9, 23, 19, 25, 11)}]
执行:删除用户,影响行数: 1
查询所有用户: ()

事务处理示例

模拟简单的转账操作,从一个用户账户转移到另一个用户账户。

def transfer_points(sender_id: int, receiver_id: int, points: int) -> bool:
    """转账操作(事务示例)"""
    conn = pymysql.connect(**db_config)
    try:
        with conn.cursor(DictCursor) as cursor:
            # 检查发送者余额
            cursor.execute("SELECT points FROM accounts WHERE user_id = %s", (sender_id,))
            sender = cursor.fetchone()
            
            if not sender or sender['points'] < points:
                raise ValueError("余额不足")
            
            # 执行转账
            cursor.execute("UPDATE accounts SET points = points - %s WHERE user_id = %s", 
                         (points, sender_id))
            cursor.execute("UPDATE accounts SET points = points + %s WHERE user_id = %s", 
                         (points, receiver_id))
            
            conn.commit()
            return True
    except Exception as e:
        conn.rollback()
        raise e
    finally:
        conn.close()

批量操作

def batch_insert_users(users: List[tuple]) -> int:
    """批量插入用户数据"""
    sql = "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)"
    conn = pymysql.connect(**db_config)
    try:
        with conn.cursor() as cursor:
            affected_rows = cursor.executemany(sql, users)
            conn.commit()
            return affected_rows
    except Exception as e:
        conn.rollback()
        raise e
    finally:
        conn.close()

# 使用示例
users_data = [
    ('张三', 'zhangsan@example.com', 25),
    ('李四', 'lisi@example.com', 30)
]
batch_insert_users(users_data)

四、连接池优化

为什么需要连接池

频繁创建和关闭数据库连接会导致:

使用 DBUtils 实现连接池

安装方法

pip install DBUtils

实现示例

from dbutils.pooled_db import PooledDB
import pymysql
import threading
from typing import List, Dict, Any, Tuple
from pymysql.cursors import DictCursor

class ConnectionPool:
    """数据库连接池"""
    
    _instance = None
    _lock = threading.Lock()
    
    def __new__(cls, config: Dict[str, Any]):
        with cls._lock:
            if cls._instance is None:
                cls._instance = super().__new__(cls)
                cls._instance.pool_config = config.copy()
                cls._instance._pool = PooledDB(
                    creator=pymysql,
                    maxconnections=20,  # 最大连接数
                    mincached=2,  # 初始空闲连接
                    maxcached=10,  # 最大空闲连接
                    blocking=True,  # 连接耗尽时等待
                    ping=1,  # 使用时检查连接
                    **config
                )
        return cls._instance
            
    def get_connection(self):
        """从连接池获取连接"""
        return self._pool.connection()

# 使用连接池的数据库管理器
class PooledDBManager:
    def __init__(self, pool_config: Dict[str, Any]):
        self.pool = ConnectionPool(pool_config)
    
    def execute_query(self, sql: str, params: Tuple = None) -> List[Dict]:
        """执行查询"""
        conn = self.pool.get_connection()
        try:
            with conn.cursor(DictCursor) as cursor:
                cursor.execute(sql, params or ())
                return cursor.fetchall()
        finally:
            conn.close()  # 实际是放回连接池
    
    def execute_update(self, sql: str, params: Tuple = None) -> int:
        """执行更新"""
        conn = self.pool.get_connection()
        try:
            with conn.cursor() as cursor:
                affected_rows = cursor.execute(sql, params or ())
                conn.commit()
                return affected_rows
        except Exception as e:
            conn.rollback()
            raise e
        finally:
            conn.close()

ping 参数说明

0 = 不检查
1 = 每次请求时检查(推荐)
2 = 每次游标创建时检查
4 = 每次执行时检查
7 = 1+2+4(所有检查)

五、应用示例

Flask 集成示例

from dbutils.pooled_db import PooledDB
from flask import Flask, request, jsonify
from pymysql.cursors import DictCursor

app = Flask(__name__)

db_config = {
    'host': 'localhost',
    'user': 'root',
    'password': 'password',
    'database': 'test_db',
    'charset': 'utf8mb4',
    'cursorclass': pymysql.cursors.DictCursor
}

# 初始化连接池
db_manager = PooledDBManager(db_config)

@app.route('/users', methods=['GET'])
def get_users():
    """获取所有用户"""
    try:
        users = db_manager.execute_query("SELECT * FROM users")
        return jsonify({'success': True, 'data': users})
    except Exception as e:
        return jsonify({'success': False, 'error': str(e)}), 500

@app.route('/users', methods=['POST'])
def create_user():
    """创建用户"""
    try:
        data = request.json
        sql = "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)"
        result = db_manager.execute_update(sql, (data['name'], data['email'], data['age']))
        return jsonify({'success': True, 'affected_rows': result})
    except Exception as e:
        return jsonify({'success': False, 'error': str(e)}), 500

if __name__ == '__main__':
    app.run(debug=True)

连接池实践配置

# 优化后的连接池配置
optimal_pool_config = {
    'maxconnections': 20,      # 根据并发量调整
    'mincached': 2,           # 减少初始资源占用
    'maxcached': 10,          # 控制最大空闲连接
    'blocking': True,         # 避免连接耗尽错误
    'ping': 1,                # 使用前检查连接健康
    **db_config              # 基础数据库配置
}

错误重试机制

数据库操作重试装饰器:当数据库连接出现临时故障时,会自动进行最多3次重试,并且每次重试间隔时间按指数增长(1秒、2秒、4秒),提高程序的容错能力。

import time
from functools import wraps
import pymysql

def retry_on_failure(max_retries=3, initial_delay=1):
    """数据库操作重试装饰器"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except (pymysql.OperationalError, pymysql.InterfaceError) as e:
                    if attempt == max_retries - 1:
                        raise e
                    time.sleep(initial_delay * (2 ** attempt))  # 指数退避
            return None
        return wrapper
    return decorator

# 使用示例
@retry_on_failure(max_retries=3)
def robust_query(sql, params=None):
    return db_manager.execute_query(sql, params)

指数退避:当操作失败时,不立即重试,而是等待一段时间,且每次重试的等待时间呈指数级增长。等待 1 秒, 2 秒, 4 秒,8 秒…

六、SQL事务操作对比

事务影响

操作类型语法示例主要用途返回值事务影响性能考虑使用场景
SELECT
(查询)
SELECT * FROM users WHERE age > 18;从数据库中检索数据结果集(0行或多行)只读操作,不影响数据索引优化很重要,避免全表扫描数据查询、报表生成、数据分析
UPDATE
(更新)
UPDATE users SET age = 20 WHERE id = 1;修改现有记录受影响的行数需要事务控制,会锁定行WHERE 条件要精确,避免锁表修改用户信息、更新状态、调整数值
INSERT
(插入)
INSERT INTO users (name, age) VALUES (‘张三’, 25);添加新记录插入的行数(通常是1)需要事务控制批量插入比单条插入高效新增用户、创建订单、记录日志
DELETE
(删除)
DELETE FROM users WHERE id = 1;删除记录受影响的行数需要事务控制,谨慎使用建议软删除,避免物理删除删除用户、清理数据、撤销操作

事务特性

操作是否自动提交锁级别回滚支持并发影响
SELECT是(可设置)共享锁可回滚到快照低(读写不阻塞)
UPDATE排他锁完全支持高(会阻塞其他写操作)
INSERT排他锁完全支持中(可能触发索引重建)
DELETE排他锁完全支持高(会阻塞其他操作)

普通 SELECT 是完全无锁的,不会阻塞其他事务的写操作,也不会被写操作阻塞。只有显式加锁的SELECT才会影响并发。

七、总结

连接管理

事务控制

错误处理

性能优化

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文