PyMySQL数据库连接与优化方式
作者:AI手记叨叨礼拜天
本文介绍PyMySQL连接MySQL数据库的方法,涵盖CRUD操作、事务处理及连接池优化,通过合理配置连接池和错误重试机制,提升性能并确保数据一致性
本文将介绍如何使用 PyMySQL 连接和操作 MySQL 数据库,包括基本连接、CRUD 操作、事务处理以及如何在高并发环境下使用连接池优化性能。
通过合理的连接池配置和错误处理机制,可以构建出稳定高效的数据库应用。
一、PyMySQL 简介
PyMySQL 是一个纯 Python 实现的 MySQL 客户端库,用于连接和操作 MySQL 数据库。它完全兼容 Python DB API 2.0 规范,提供了简单易用的接口来执行 SQL 查询和操作。
核心优势
- 纯 Python 实现:无需外部依赖,跨平台兼容性好
- Python 3 全面支持:兼容最新 Python 特性和语法
- 线程安全:支持多线程并发操作
- 完整功能支持:事务、存储过程、预处理语句等
- 广泛兼容:支持 MySQL 5.5+ 和 MariaDB
安装方法
pip install pymysql
二、数据库连接配置
基础连接方式
import pymysql from pymysql.cursors import DictCursor # 推荐配置方式 def create_connection(): return pymysql.connect( host='localhost', # 数据库地址 user='username', # 用户名 password='password', # 密码 database='test_db', # 数据库名 port=3306, # 端口,默认3306 charset='utf8mb4', # 字符集,推荐utf8mb4 autocommit=False, # 是否自动提交 cursorclass=DictCursor # 返回字典格式结果 )
连接参数说明
参数 | 说明 | 值 |
---|---|---|
host | 数据库服务器地址 | ‘localhost’ |
user | 用户名 | 根据实际配置 |
password | 密码 | 根据实际配置 |
database | 数据库名称 | 项目数据库名 |
charset | 字符编码 | ‘utf8mb4’(支持表情符号) |
autocommit | 自动提交事务 | False(建议手动控制) |
cursorclass | 游标类型 | DictCursor(结果以字典返回) |
cursorclass参数说明
cursorclass | 说明 | 返回结果格式 | 适用场景 |
---|---|---|---|
Cursor (默认) | 普通游标 | 元组格式 (value1, value2, …) | 基础查询,需要最高性能时 |
DictCursor | 字典游标 | 字典格式 {‘column’: value} | 需要按列名访问数据时 |
SSCursor | 无缓冲游标 | 元组格式,流式读取 | 处理大量数据,内存有限时 |
SSDictCursor | 无缓冲字典游标 | 字典格式,流式读取 | 大量数据且需要按列名访问 |
Cursor 子类 | 自定义游标 | 自定义格式 | 特殊数据处理需求 |
完整连接示例
import pymysql from pymysql.cursors import DictCursor def get_db_connection(): """获取数据库连接""" return pymysql.connect( host='localhost', user='myuser', password='mypassword', database='mydatabase', charset='utf8mb4', autocommit=False, cursorclass=DictCursor, connect_timeout=10 # 连接超时10秒 ) # 使用示例 def test_connection(): conn = get_db_connection() try: with conn.cursor() as cursor: cursor.execute("SELECT 1 as test") result = cursor.fetchone() print("连接测试成功:", result) finally: conn.close() test_connection()
输出:
连接测试成功: {'test': 1}
三、数据库基础操作
创建示例数据表
CREATE TABLE mydb.users ( id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(100) NOT NULL, email VARCHAR(100) UNIQUE NOT NULL, age INT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP );
相关说明
关键字 | 类型 | 说明 |
---|---|---|
INT | 数据类型 | 整数类型,用于存储整数值 |
AUTO_INCREMENT | 约束/属性 | 自动递增,每次插入新记录时自动生成唯一ID |
PRIMARY KEY | 约束 | 主键,唯一标识每条记录 |
VARCHAR(100) | 数据类型 | 可变长度字符串,最大100字符 |
NOT NULL | 约束 | 该字段不能为空,必须包含值 |
UNIQUE | 约束 | 确保每个值唯一,不允许重复 |
TIMESTAMP | 数据类型 | 时间戳类型,用于存储日期和时间 |
DEFAULT CURRENT_TIMESTAMP | 默认值 | 默认值为当前系统时间 |
数据库操作封装类
import pymysql from pymysql.cursors import DictCursor from typing import List, Dict, Any, Optional, Tuple class MySQLManager: """MySQL 数据库管理类""" def __init__(self, config: Dict[str, Any]): self.config = config def execute_query(self, sql: str, params: Tuple = None) -> List[Dict]: """执行查询语句(SELECT)""" conn = pymysql.connect(**self.config) try: with conn.cursor(DictCursor) as cursor: cursor.execute(sql, params or ()) return cursor.fetchall() finally: conn.close() def execute_update(self, sql: str, params: Tuple = None) -> int: """执行更新语句(INSERT/UPDATE/DELETE)""" conn = pymysql.connect(**self.config) try: with conn.cursor() as cursor: affected_rows = cursor.execute(sql, params or ()) conn.commit() return affected_rows except Exception as e: conn.rollback() raise e finally: conn.close()
CRUD 操作示例
操作 | 英文 | 中文 | 对应 SQL | 描述 |
---|---|---|---|---|
C | Create | 创建 | INSERT | 创建新记录 |
R | Read | 读取 | SELECT | 查询/读取数据 |
U | Update | 更新 | UPDATE | 修改现有记录 |
D | Delete | 删除 | DELETE | 删除记录 |
# 数据库配置 db_config = { 'host': 'localhost', 'user': 'root', 'password': 'password', 'database': 'test_db', 'charset': 'utf8mb4', 'cursorclass': DictCursor } db = MySQLManager(db_config) # 1. 插入数据 def add_user(name: str, email: str, age: int) -> int: sql = "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)" return db.execute_update(sql, (name, email, age)) # 2. 查询数据 def get_all_users() -> List[Dict]: return db.execute_query("SELECT * FROM users") # 3. 更新数据 def update_user_email(user_id: int, new_email: str) -> int: sql = "UPDATE users SET email = %s WHERE id = %s" return db.execute_update(sql, (new_email, user_id)) # 4. 删除数据 def delete_user(user_id: int) -> int: return db.execute_update("DELETE FROM users WHERE id = %s", (user_id,)) if __name__ == '__main__': users = get_all_users() print(f"查询所有用户: {users}") user_id = add_user("张三", "zhangsan@example.com", 25) print(f"执行:插入新用户") users = get_all_users() print(f"查询所有用户: {users}") user_id = users[0]['id'] affected_rows = update_user_email(user_id, "zhangsan2@example.com") print(f"执行:更新邮箱,影响行数: {affected_rows}") users = get_all_users() print(f"查询所有用户: {users}") affected_rows = delete_user(user_id) print(f"执行:删除用户,影响行数: {affected_rows}") users = get_all_users() print(f"查询所有用户: {users}")
输出:
查询所有用户: () 执行:插入新用户 查询所有用户: [{'id': 3, 'name': '张三', 'email': 'zhangsan@example.com', 'age': 25, 'created_at': datetime.datetime(2025, 9, 23, 19, 25, 11)}] 执行:更新邮箱,影响行数: 1 查询所有用户: [{'id': 3, 'name': '张三', 'email': 'zhangsan2@example.com', 'age': 25, 'created_at': datetime.datetime(2025, 9, 23, 19, 25, 11)}] 执行:删除用户,影响行数: 1 查询所有用户: ()
事务处理示例
模拟简单的转账操作,从一个用户账户转移到另一个用户账户。
def transfer_points(sender_id: int, receiver_id: int, points: int) -> bool: """转账操作(事务示例)""" conn = pymysql.connect(**db_config) try: with conn.cursor(DictCursor) as cursor: # 检查发送者余额 cursor.execute("SELECT points FROM accounts WHERE user_id = %s", (sender_id,)) sender = cursor.fetchone() if not sender or sender['points'] < points: raise ValueError("余额不足") # 执行转账 cursor.execute("UPDATE accounts SET points = points - %s WHERE user_id = %s", (points, sender_id)) cursor.execute("UPDATE accounts SET points = points + %s WHERE user_id = %s", (points, receiver_id)) conn.commit() return True except Exception as e: conn.rollback() raise e finally: conn.close()
批量操作
def batch_insert_users(users: List[tuple]) -> int: """批量插入用户数据""" sql = "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)" conn = pymysql.connect(**db_config) try: with conn.cursor() as cursor: affected_rows = cursor.executemany(sql, users) conn.commit() return affected_rows except Exception as e: conn.rollback() raise e finally: conn.close() # 使用示例 users_data = [ ('张三', 'zhangsan@example.com', 25), ('李四', 'lisi@example.com', 30) ] batch_insert_users(users_data)
四、连接池优化
为什么需要连接池
频繁创建和关闭数据库连接会导致:
- 资源浪费(TCP 连接建立开销)
- 性能下降(连接初始化时间)
- 连接数耗尽(超过数据库最大连接数)
连接池通过复用连接解决这些问题。
使用 DBUtils 实现连接池
安装方法
pip install DBUtils
实现示例
from dbutils.pooled_db import PooledDB import pymysql import threading from typing import List, Dict, Any, Tuple from pymysql.cursors import DictCursor class ConnectionPool: """数据库连接池""" _instance = None _lock = threading.Lock() def __new__(cls, config: Dict[str, Any]): with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance.pool_config = config.copy() cls._instance._pool = PooledDB( creator=pymysql, maxconnections=20, # 最大连接数 mincached=2, # 初始空闲连接 maxcached=10, # 最大空闲连接 blocking=True, # 连接耗尽时等待 ping=1, # 使用时检查连接 **config ) return cls._instance def get_connection(self): """从连接池获取连接""" return self._pool.connection() # 使用连接池的数据库管理器 class PooledDBManager: def __init__(self, pool_config: Dict[str, Any]): self.pool = ConnectionPool(pool_config) def execute_query(self, sql: str, params: Tuple = None) -> List[Dict]: """执行查询""" conn = self.pool.get_connection() try: with conn.cursor(DictCursor) as cursor: cursor.execute(sql, params or ()) return cursor.fetchall() finally: conn.close() # 实际是放回连接池 def execute_update(self, sql: str, params: Tuple = None) -> int: """执行更新""" conn = self.pool.get_connection() try: with conn.cursor() as cursor: affected_rows = cursor.execute(sql, params or ()) conn.commit() return affected_rows except Exception as e: conn.rollback() raise e finally: conn.close()
ping 参数说明
0 = 不检查 1 = 每次请求时检查(推荐) 2 = 每次游标创建时检查 4 = 每次执行时检查 7 = 1+2+4(所有检查)
五、应用示例
Flask 集成示例
from dbutils.pooled_db import PooledDB from flask import Flask, request, jsonify from pymysql.cursors import DictCursor app = Flask(__name__) db_config = { 'host': 'localhost', 'user': 'root', 'password': 'password', 'database': 'test_db', 'charset': 'utf8mb4', 'cursorclass': pymysql.cursors.DictCursor } # 初始化连接池 db_manager = PooledDBManager(db_config) @app.route('/users', methods=['GET']) def get_users(): """获取所有用户""" try: users = db_manager.execute_query("SELECT * FROM users") return jsonify({'success': True, 'data': users}) except Exception as e: return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/users', methods=['POST']) def create_user(): """创建用户""" try: data = request.json sql = "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)" result = db_manager.execute_update(sql, (data['name'], data['email'], data['age'])) return jsonify({'success': True, 'affected_rows': result}) except Exception as e: return jsonify({'success': False, 'error': str(e)}), 500 if __name__ == '__main__': app.run(debug=True)
连接池实践配置
# 优化后的连接池配置 optimal_pool_config = { 'maxconnections': 20, # 根据并发量调整 'mincached': 2, # 减少初始资源占用 'maxcached': 10, # 控制最大空闲连接 'blocking': True, # 避免连接耗尽错误 'ping': 1, # 使用前检查连接健康 **db_config # 基础数据库配置 }
错误重试机制
数据库操作重试装饰器:当数据库连接出现临时故障时,会自动进行最多3次重试,并且每次重试间隔时间按指数增长(1秒、2秒、4秒),提高程序的容错能力。
import time from functools import wraps import pymysql def retry_on_failure(max_retries=3, initial_delay=1): """数据库操作重试装饰器""" def decorator(func): @wraps(func) def wrapper(*args, **kwargs): for attempt in range(max_retries): try: return func(*args, **kwargs) except (pymysql.OperationalError, pymysql.InterfaceError) as e: if attempt == max_retries - 1: raise e time.sleep(initial_delay * (2 ** attempt)) # 指数退避 return None return wrapper return decorator # 使用示例 @retry_on_failure(max_retries=3) def robust_query(sql, params=None): return db_manager.execute_query(sql, params)
指数退避:当操作失败时,不立即重试,而是等待一段时间,且每次重试的等待时间呈指数级增长。等待 1 秒, 2 秒, 4 秒,8 秒…
六、SQL事务操作对比
事务影响
操作类型 | 语法示例 | 主要用途 | 返回值 | 事务影响 | 性能考虑 | 使用场景 |
---|---|---|---|---|---|---|
SELECT (查询) | SELECT * FROM users WHERE age > 18; | 从数据库中检索数据 | 结果集(0行或多行) | 只读操作,不影响数据 | 索引优化很重要,避免全表扫描 | 数据查询、报表生成、数据分析 |
UPDATE (更新) | UPDATE users SET age = 20 WHERE id = 1; | 修改现有记录 | 受影响的行数 | 需要事务控制,会锁定行 | WHERE 条件要精确,避免锁表 | 修改用户信息、更新状态、调整数值 |
INSERT (插入) | INSERT INTO users (name, age) VALUES (‘张三’, 25); | 添加新记录 | 插入的行数(通常是1) | 需要事务控制 | 批量插入比单条插入高效 | 新增用户、创建订单、记录日志 |
DELETE (删除) | DELETE FROM users WHERE id = 1; | 删除记录 | 受影响的行数 | 需要事务控制,谨慎使用 | 建议软删除,避免物理删除 | 删除用户、清理数据、撤销操作 |
事务特性
操作 | 是否自动提交 | 锁级别 | 回滚支持 | 并发影响 |
---|---|---|---|---|
SELECT | 是(可设置) | 共享锁 | 可回滚到快照 | 低(读写不阻塞) |
UPDATE | 否 | 排他锁 | 完全支持 | 高(会阻塞其他写操作) |
INSERT | 否 | 排他锁 | 完全支持 | 中(可能触发索引重建) |
DELETE | 否 | 排他锁 | 完全支持 | 高(会阻塞其他操作) |
- 排他锁(X锁):写锁,一个事务独占资源,其他事务不能读写
- 共享锁(S锁):读锁,多个事务可同时读取,但不能写入
- 排他锁 = 独占,共享锁 = 共享读
普通 SELECT 是完全无锁的,不会阻塞其他事务的写操作,也不会被写操作阻塞。只有显式加锁的SELECT才会影响并发。
七、总结
连接管理
- 使用连接池管理数据库连接
- 合理配置连接池参数
- 及时释放连接回池
事务控制
- 明确控制事务边界
- 及时提交或回滚事务
- 处理并发场景下的数据一致性
错误处理
- 实现适当的重试机制
- 记录详细的错误日志
- 区分业务错误和系统错误
性能优化
- 使用预处理语句防止 SQL 注入
- 合理使用批量操作
- 监控连接池使用情况
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。