python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python与数据库交互

从基础到高级详解Python与关系型数据库交互的完全指南

作者:Python×CATIA工业智造

在当今数据驱动的开发环境中,与关系型数据库进行高效交互已成为Python开发者​​必备的核心技能​​,本文将全面探讨Python与关系型数据库交互的各种方法和技术,有需要的小伙伴可以了解下

引言

在当今数据驱动的开发环境中,与关系型数据库进行高效交互已成为Python开发者​​必备的核心技能​​。无论是Web应用、数据分析平台还是企业级系统,都需要与MySQL、PostgreSQL、SQLite等关系型数据库进行无缝集成。Python凭借其​​简洁的语法​​、​​丰富的生态系统​​和​​强大的库支持​​,为数据库交互提供了多种高效灵活的解决方案。

本文将全面探讨Python与关系型数据库交互的各种方法和技术,从基础连接操作到高级ORM使用,从事务处理到性能优化。无论您是初学者还是经验丰富的开发者,本文都将为您提供实用的代码示例和最佳实践建议,帮助您构建更健壮、高效的数据库应用。

关系型数据库因其​​结构化数据存储​​、​​ACID事务支持​​和​​强大的SQL查询能力​​,在数据处理领域占据重要地位。掌握Python与关系型数据库的交互技巧,不仅能够提高应用程序的数据处理能力,还能为您的职业发展增添重要技能。

一、数据库连接基础

1.1 选择适当的数据库驱动

Python提供了多种数据库驱动程序,用于连接不同类型的关系型数据库。根据目标数据库的不同,需要选择相应的驱动库:

安装这些驱动通常使用pip包管理器:

pip install mysql-connector-python pymysql psycopg2

1.2 建立数据库连接

建立数据库连接是与数据库交互的第一步。以下是连接不同数据库的示例:

# MySQL连接示例
import mysql.connector

mysql_conn = mysql.connector.connect(
    host='localhost',
    user='your_username',
    password='your_password',
    database='your_database'
)
print("MySQL连接成功!")

# PostgreSQL连接示例
import psycopg2

pg_conn = psycopg2.connect(
    host='localhost',
    database='your_database',
    user='your_username',
    password='your_password'
)
print("PostgreSQL连接成功!")

# SQLite连接示例
import sqlite3

sqlite_conn = sqlite3.connect('example.db')
print("SQLite连接成功!")

1.3 连接参数配置与最佳实践

为了提高连接的安全性和性能,建议使用配置文件管理数据库连接参数:

import configparser
import mysql.connector

# 读取配置文件
config = configparser.ConfigParser()
config.read('db_config.ini')

# 获取数据库连接参数
db_params = {
    'host': config['mysql']['host'],
    'user': config['mysql']['user'],
    'password': config['mysql']['password'],
    'database': config['mysql']['database']
}

# 建立连接
try:
    conn = mysql.connector.connect(**db_params)
    if conn.is_connected():
        print("数据库连接成功!")
except Exception as e:
    print(f"连接失败: {e}")

二、执行SQL语句与CRUD操作

2.1 使用游标执行查询

游标(Cursor)是与数据库交互的核心对象,用于执行SQL语句并获取结果。

# 创建游标对象
cursor = conn.cursor()

# 执行简单查询
cursor.execute("SELECT * FROM users")

# 获取所有结果
results = cursor.fetchall()

# 遍历结果
for row in results:
    print(f"ID: {row[0]}, Name: {row[1]}, Email: {row[2]}")

# 关闭游标
cursor.close()

2.2 参数化查询

为防止SQL注入攻击,应始终使用参数化查询:

# 参数化查询示例
user_id = 5
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))

# 插入数据时的参数化查询
new_user = ('John Doe', 'john@example.com', 30)
cursor.execute("INSERT INTO users (name, email, age) VALUES (%s, %s, %s)", new_user)
conn.commit()

不同数据库的参数占位符有所不同:

数据库类型参数占位符
MySQL%s
PostgreSQL%s
SQLite?

2.3 完整的CRUD操作示例

下面是一个完整的CRUD(创建、读取、更新、删除)操作示例:

import sqlite3

# 创建连接
conn = sqlite3.connect('example.db')
cursor = conn.cursor()

# 创建表
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    name TEXT NOT NULL,
    email TEXT UNIQUE NOT NULL,
    age INTEGER
)
''')

# 插入数据
def create_user(name, email, age):
    try:
        cursor.execute(
            "INSERT INTO users (name, email, age) VALUES (?, ?, ?)",
            (name, email, age)
        )
        conn.commit()
        print("用户添加成功!")
    except sqlite3.IntegrityError:
        print("邮箱已存在!")

# 查询数据
def read_users():
    cursor.execute("SELECT * FROM users")
    return cursor.fetchall()

# 更新数据
def update_user(user_id, name=None, email=None, age=None):
    updates = []
    params = []
    
    if name:
        updates.append("name = ?")
        params.append(name)
    if email:
        updates.append("email = ?")
        params.append(email)
    if age is not None:
        updates.append("age = ?")
        params.append(age)
    
    params.append(user_id)
    cursor.execute(
        f"UPDATE users SET {', '.join(updates)} WHERE id = ?",
        params
    )
    conn.commit()

# 删除数据
def delete_user(user_id):
    cursor.execute("DELETE FROM users WHERE id = ?", (user_id,))
    conn.commit()

# 使用示例
create_user('Alice', 'alice@example.com', 25)
create_user('Bob', 'bob@example.com', 30)

print("所有用户:")
users = read_users()
for user in users:
    print(user)

update_user(1, age=26)

print("更新后的用户:")
users = read_users()
for user in users:
    print(user)

# 关闭连接
conn.close()

三、使用ORM进行高级数据库操作

3.1 ORM简介与优势

ORM(Object-Relational Mapping,对象关系映射)允许使用面向对象的方式操作数据库,无需编写原生SQL语句。主要优势包括:

3.2 SQLAlchemy基础

SQLAlchemy是Python中最流行的ORM框架之一,功能强大且灵活。

from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

# 定义基类
Base = declarative_base()

# 定义模型
class User(Base):
    __tablename__ = 'users'
    
    id = Column(Integer, primary_key=True)
    name = Column(String(50))
    email = Column(String(100), unique=True)
    age = Column(Integer)

# 创建引擎和会话
engine = create_engine('sqlite:///example.db')
Session = sessionmaker(bind=engine)
session = Session()

# 创建表
Base.metadata.create_all(engine)

# 插入数据
new_user = User(name='Charlie', email='charlie@example.com', age=35)
session.add(new_user)
session.commit()

# 查询数据
users = session.query(User).filter(User.age > 25).all()
for user in users:
    print(f"{user.name}: {user.email}")

# 更新数据
user = session.query(User).filter_by(name='Charlie').first()
if user:
    user.age = 36
    session.commit()

# 删除数据
user = session.query(User).filter_by(name='Charlie').first()
if user:
    session.delete(user)
    session.commit()

# 关闭会话
session.close()

3.3 高级查询技巧

SQLAlchemy提供了丰富的查询接口,支持复杂的查询操作:

from sqlalchemy import or_, and_

# 复杂查询示例
users = session.query(User).filter(
    or_(
        User.age < 25,
        User.age > 35
    ),
    User.email.like('%@example.com')
).order_by(User.name.desc()).limit(10).all()

# 聚合查询
from sqlalchemy import func

age_stats = session.query(
    func.count(User.id),
    func.avg(User.age),
    func.min(User.age),
    func.max(User.age)
).first()

# 连接查询
class Address(Base):
    __tablename__ = 'addresses'
    
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, ForeignKey('users.id'))
    street = Column(String(100))
    city = Column(String(50))
    
    user = relationship("User", back_populates="addresses")

User.addresses = relationship("Address", order_by=Address.id, back_populates="user")

# 执行连接查询
users_with_addresses = session.query(User).join(Address).filter(Address.city == 'New York').all()

3.4 Django ORM简介

对于Django项目,可以使用其内置的ORM系统:

# Django模型定义
from django.db import models

class User(models.Model):
    name = models.CharField(max_length=50)
    email = models.EmailField(unique=True)
    age = models.IntegerField()
    created_at = models.DateTimeField(auto_now_add=True)
    
    def __str__(self):
        return self.name

# 使用Django ORM进行查询
from myapp.models import User

# 创建用户
User.objects.create(name='Alice', email='alice@example.com', age=25)

# 查询用户
users = User.objects.filter(age__gt=20).order_by('-created_at')

# 更新用户
User.objects.filter(email='alice@example.com').update(age=26)

# 删除用户
User.objects.filter(email='alice@example.com').delete()

四、事务处理与连接池管理

4.1 事务管理

事务是数据库操作中的重要概念,确保一系列操作要么全部成功,要么全部失败。

import mysql.connector
from mysql.connector import Error

try:
    conn = mysql.connector.connect(
        host='localhost',
        database='your_database',
        user='your_username',
        password='your_password'
    )
    
    # 开启事务
    conn.start_transaction()
    
    cursor = conn.cursor()
    
    # 执行多个操作
    cursor.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
    cursor.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
    
    # 提交事务
    conn.commit()
    print("事务执行成功!")
    
except Error as e:
    # 回滚事务
    conn.rollback()
    print(f"事务执行失败: {e}")
    
finally:
    if conn.is_connected():
        cursor.close()
        conn.close()

4.2 使用连接池

对于高并发应用,使用连接池可以显著提高性能:

from mysql.connector import pooling
import threading

# 创建连接池
dbconfig = {
    "host": "localhost",
    "user": "your_username",
    "password": "your_password",
    "database": "your_database"
}

connection_pool = pooling.MySQLConnectionPool(
    pool_name="my_pool",
    pool_size=5,
    pool_reset_session=True,
    **dbconfig
)

def query_with_pool(user_id):
    try:
        # 从连接池获取连接
        conn = connection_pool.get_connection()
        cursor = conn.cursor()
        
        cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
        result = cursor.fetchone()
        
        print(f"用户查询结果: {result}")
        
        # 将连接返回给连接池
        cursor.close()
        conn.close()
        
    except Error as e:
        print(f"查询失败: {e}")

# 多线程使用连接池示例
threads = []
for i in range(10):
    thread = threading.Thread(target=query_with_pool, args=(i % 5 + 1,))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

4.3 上下文管理器与连接管理

使用上下文管理器可以更安全地管理数据库连接:

from contextlib import contextmanager

@contextmanager
def get_db_connection():
    conn = None
    try:
        conn = connection_pool.get_connection()
        yield conn
    except Error as e:
        print(f"数据库错误: {e}")
        if conn:
            conn.rollback()
        raise
    finally:
        if conn:
            conn.close()

@contextmanager
def get_db_cursor():
    with get_db_connection() as conn:
        cursor = conn.cursor()
        try:
            yield cursor
            conn.commit()
        except:
            conn.rollback()
            raise
        finally:
            cursor.close()

# 使用上下文管理器
with get_db_cursor() as cursor:
    cursor.execute("INSERT INTO users (name, email) VALUES (%s, %s)", 
                  ("John", "john@example.com"))
    print("数据插入成功!")

五、性能优化与最佳实践

5.1 查询优化技巧

优化数据库查询可以显著提高应用程序性能:

# 批量插入示例
def bulk_insert_users(users_data):
    try:
        cursor = conn.cursor()
        
        # 批量插入
        cursor.executemany(
            "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)",
            users_data
        )
        
        conn.commit()
        print(f"成功插入 {len(users_data)} 条记录")
        
    except Error as e:
        conn.rollback()
        print(f"批量插入失败: {e}")

# 使用示例
users_data = [
    ('User1', 'user1@example.com', 25),
    ('User2', 'user2@example.com', 30),
    ('User3', 'user3@example.com', 35)
]
bulk_insert_users(users_data)

5.2 连接管理最佳实践

正确的连接管理对应用性能至关重要:

# 数据库工具类示例
class DatabaseManager:
    def __init__(self, **config):
        self.config = config
        self.connection_pool = None
        
    def create_pool(self, pool_size=5):
        try:
            self.connection_pool = pooling.MySQLConnectionPool(
                pool_name="app_pool",
                pool_size=pool_size,
                **self.config
            )
            print("连接池创建成功!")
        except Error as e:
            print(f"连接池创建失败: {e}")
            raise
    
    @contextmanager
    def get_connection(self):
        if not self.connection_pool:
            self.create_pool()
        
        conn = None
        try:
            conn = self.connection_pool.get_connection()
            yield conn
        finally:
            if conn:
                conn.close()
    
    @contextmanager
    def get_cursor(self, dictionary=False):
        with self.get_connection() as conn:
            cursor = conn.cursor(dictionary=dictionary)
            try:
                yield cursor
                conn.commit()
            except:
                conn.rollback()
                raise
            finally:
                cursor.close()
    
    def execute_query(self, query, params=None):
        with self.get_cursor() as cursor:
            cursor.execute(query, params or ())
            return cursor.fetchall()
    
    def execute_command(self, query, params=None):
        with self.get_cursor() as cursor:
            cursor.execute(query, params or ())
            return cursor.rowcount

# 使用示例
db_config = {
    "host": "localhost",
    "user": "your_username",
    "password": "your_password",
    "database": "your_database"
}

db_manager = DatabaseManager(**db_config)

# 执行查询
users = db_manager.execute_query("SELECT * FROM users WHERE age > %s", (25,))
for user in users:
    print(user)

# 执行命令
row_count = db_manager.execute_command(
    "UPDATE users SET age = %s WHERE id = %s",
    (26, 1)
)
print(f"更新了 {row_count} 行数据")

5.3 监控与诊断

监控数据库性能是优化的重要部分:

import time
from functools import wraps

def query_logger(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        
        print(f"查询执行时间: {end_time - start_time:.4f}秒")
        return result
    return wrapper

# 使用装饰器记录查询时间
class MonitoredDatabaseManager(DatabaseManager):
    @query_logger
    def execute_query(self, query, params=None):
        return super().execute_query(query, params)
    
    @query_logger
    def execute_command(self, query, params=None):
        return super().execute_command(query, params)

# 使用监控版数据库管理器
monitored_db = MonitoredDatabaseManager(**db_config)
users = monitored_db.execute_query("SELECT * FROM users WHERE age > %s", (25,))

总结

Python与关系型数据库的交互是现代应用开发的核心技能。本文全面介绍了从基础连接到高级ORM使用的各种技术,涵盖了MySQL、PostgreSQL和SQLite等主流关系型数据库。

关键要点回顾

实践建议

进一步学习

要深入了解Python与数据库交互的高级主题,可以探索以下方向:

通过掌握这些技能,您将能够构建高效、可靠的数据驱动应用,满足现代软件开发的需求。

到此这篇关于从基础到高级详解Python与关系型数据库交互的完全指南的文章就介绍到这了,更多相关Python与数据库交互内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文