python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Flask实现MySQL增删改查API

利用Flask实现MySQL数据库增删改查API的全过程

作者:xcLeigh

这篇文章主要介绍了如何用Flask实现MySQL数据库增删改查API,本文围绕用Flask实现MySQL数据库增删改查(CRUD)API展开,并附上完整代码示例,帮助开发者掌握Flask结合MySQL开发API的方法,需要的朋友可以参考下

前言

Python作为一门简洁、易读、功能强大的编程语言,其基础语法是入门学习的核心。掌握好基础语法,能为后续的编程实践打下坚实的基础。本文将全面讲解Python3的基础语法知识,适合编程初学者系统学习。Python以其简洁优雅的语法和强大的通用性,成为当今最受欢迎的编程语言。本专栏旨在系统性地带你从零基础入门到精通Python核心。无论你是零基础小白还是希望进阶的专业开发者,都将通过清晰的讲解、丰富的实例和实战项目,逐步掌握语法基础、核心数据结构、函数与模块、面向对象编程、文件处理、主流库应用(如数据分析、Web开发、自动化)以及面向对象高级特性,最终具备独立开发能力和解决复杂问题的思维,高效应对数据分析、人工智能、Web应用、自动化脚本等广泛领域的实际需求。

在Web开发中,API常需与数据库交互以实现数据的持久化存储,MySQL作为主流关系型数据库,广泛用于各类项目。本文基于Flask框架,结合PyMySQL库,实现对MySQL数据库的增删改查(CRUD)API,适合有基础Flask知识和MySQL基础的开发者,完整覆盖环境搭建、数据库设计、API开发及测试全流程。

一、项目准备

(一)技术栈选择

(二)环境要求

  1. Python版本:Python 3.6及以上,确保兼容Flask和PyMySQL的最新特性。
  2. 依赖库安装:通过pip命令安装所需库,命令如下:
# 安装Flask
pip install flask
# 安装PyMySQL(连接MySQL)
pip install pymysql
# 安装Flask-SQLAlchemy(可选,ORM工具,简化数据库操作)
pip install flask-sqlalchemy

本文先使用PyMySQL原生操作数据库,再补充Flask-SQLAlchemy的实现方式,满足不同开发习惯需求。

(三)MySQL数据库准备

  1. 启动MySQL服务,通过Navicat或MySQL Workbench连接数据库,执行以下SQL语句创建项目所需的数据库和表:
-- 创建数据库(命名为flask_mysql_api)
CREATE DATABASE IF NOT EXISTS flask_mysql_api CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

-- 使用数据库
USE flask_mysql_api;

-- 创建用户表(users),包含id(主键)、name(姓名)、age(年龄)、email(邮箱,唯一)
CREATE TABLE IF NOT EXISTS users (
    id INT AUTO_INCREMENT PRIMARY KEY,
    name VARCHAR(50) NOT NULL,
    age INT NOT NULL CHECK (age > 0),
    email VARCHAR(100) NOT NULL UNIQUE,
    create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
  1. 验证表结构:执行DESCRIBE users;,确认字段名、类型、约束是否正确。

二、项目结构搭建

为保证代码可读性和可维护性,创建如下项目目录结构:

flask-mysql-api/          # 项目根目录
├── app.py                # 主程序(API路由、数据库连接、逻辑处理)
├── config.py             # 配置文件(数据库连接信息、Flask配置)
└── requirements.txt      # 项目依赖清单(可选,方便他人部署)
  1. config.py:存储数据库连接配置,避免硬编码,示例代码如下:
# config.py
class Config:
    # MySQL数据库连接信息
    MYSQL_HOST = '127.0.0.1'    # 数据库主机地址(本地为127.0.0.1)
    MYSQL_PORT = 3306           # 数据库端口(默认3306)
    MYSQL_USER = 'root'         # 数据库用户名(根据实际情况修改)
    MYSQL_PASSWORD = '123456'   # 数据库密码(根据实际情况修改)
    MYSQL_DB = 'flask_mysql_api'# 数据库名(与前文创建的一致)
    MYSQL_CHARSET = 'utf8mb4'   # 字符集

# 开发环境配置(继承Config)
class DevelopmentConfig(Config):
    DEBUG = True  # 开启调试模式

# 生产环境配置(继承Config)
class ProductionConfig(Config):
    DEBUG = False  # 关闭调试模式
    # 生产环境可添加数据库连接池配置,提升性能
    MYSQL_POOL_SIZE = 10
    MYSQL_MAX_OVERFLOW = 20

# 配置映射,方便切换环境
config = {
    'development': DevelopmentConfig,
    'production': ProductionConfig,
    'default': DevelopmentConfig
}
  1. requirements.txt:记录依赖库及版本,执行pip freeze > requirements.txt生成,内容示例:
Flask==2.3.3
PyMySQL==1.1.0
Flask-SQLAlchemy==3.1.1

三、基于PyMySQL实现CRUD API

(一)数据库连接工具函数

app.py中,先编写数据库连接和关闭的工具函数,复用连接逻辑,避免重复代码:

# app.py
from flask import Flask, request, jsonify
import pymysql
from config import config

# 初始化Flask应用
app = Flask(__name__)
# 加载配置(默认开发环境)
app.config.from_object(config['default'])

# 数据库连接函数
def get_db_connection():
    try:
        connection = pymysql.connect(
            host=app.config['MYSQL_HOST'],
            port=app.config['MYSQL_PORT'],
            user=app.config['MYSQL_USER'],
            password=app.config['MYSQL_PASSWORD'],
            db=app.config['MYSQL_DB'],
            charset=app.config['MYSQL_CHARSET'],
            cursorclass=pymysql.cursors.DictCursor  # 游标返回字典格式(便于转换为JSON)
        )
        return connection
    except pymysql.Error as e:
        app.logger.error(f"数据库连接失败:{str(e)}")
        return None

# 关闭数据库连接函数
def close_db_connection(connection, cursor):
    if cursor:
        cursor.close()
    if connection:
        connection.close()

(二)实现增删改查API路由

1. 新增用户(Create)

@app.route('/api/users', methods=['POST'])
def create_user():
    # 获取请求体JSON数据
    data = request.get_json()
    # 验证必填字段
    required_fields = ['name', 'age', 'email']
    if not all(field in data for field in required_fields):
        return jsonify({'error': '缺少必填字段(name/age/email)'}), 400

    connection = None
    cursor = None
    try:
        connection = get_db_connection()
        if not connection:
            return jsonify({'error': '数据库连接失败'}), 500

        cursor = connection.cursor()
        # 执行插入SQL
        sql = "INSERT INTO users (name, age, email) VALUES (%s, %s, %s)"
        cursor.execute(sql, (data['name'], data['age'], data['email']))
        connection.commit()  # 提交事务

        # 获取新增用户的ID,查询并返回完整信息
        user_id = cursor.lastrowid
        cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
        new_user = cursor.fetchone()

        return jsonify({'message': '用户创建成功', 'user': new_user}), 201  # 201表示创建成功

    except pymysql.IntegrityError as e:
        # 处理唯一约束冲突(如email重复)
        connection.rollback()  # 回滚事务
        return jsonify({'error': '邮箱已存在', 'detail': str(e)}), 409
    except Exception as e:
        if connection:
            connection.rollback()
        app.logger.error(f"创建用户失败:{str(e)}")
        return jsonify({'error': '创建用户失败', 'detail': str(e)}), 500
    finally:
        close_db_connection(connection, cursor)

2. 查询所有用户(Read All)

@app.route('/api/users', methods=['GET'])
def get_all_users():
    connection = None
    cursor = None
    try:
        connection = get_db_connection()
        if not connection:
            return jsonify({'error': '数据库连接失败'}), 500

        cursor = connection.cursor()
        cursor.execute("SELECT * FROM users ORDER BY create_time DESC")
        users = cursor.fetchall()  # 获取所有数据

        return jsonify({'count': len(users), 'users': users}), 200

    except Exception as e:
        app.logger.error(f"查询所有用户失败:{str(e)}")
        return jsonify({'error': '查询用户失败', 'detail': str(e)}), 500
    finally:
        close_db_connection(connection, cursor)

3. 查询单个用户(Read One)

@app.route('/api/users/<int:user_id>', methods=['GET'])
def get_single_user(user_id):
    connection = None
    cursor = None
    try:
        connection = get_db_connection()
        if not connection:
            return jsonify({'error': '数据库连接失败'}), 500

        cursor = connection.cursor()
        cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
        user = cursor.fetchone()  # 获取单条数据

        if not user:
            return jsonify({'error': '用户不存在'}), 404

        return jsonify({'user': user}), 200

    except Exception as e:
        app.logger.error(f"查询用户{user_id}失败:{str(e)}")
        return jsonify({'error': '查询用户失败', 'detail': str(e)}), 500
    finally:
        close_db_connection(connection, cursor)

4. 更新用户(Update)

@app.route('/api/users/<int:user_id>', methods=['PUT'])
def update_user(user_id):
    data = request.get_json()
    # 验证至少有一个可更新字段
    update_fields = ['name', 'age', 'email']
    if not any(field in data for field in update_fields):
        return jsonify({'error': '需提供至少一个更新字段(name/age/email)'}), 400

    connection = None
    cursor = None
    try:
        connection = get_db_connection()
        if not connection:
            return jsonify({'error': '数据库连接失败'}), 500

        cursor = connection.cursor()
        # 先检查用户是否存在
        cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
        if not cursor.fetchone():
            return jsonify({'error': '用户不存在'}), 404

        # 构建动态更新SQL(避免更新未提供的字段)
        set_clause = ", ".join([f"{field} = %s" for field in data if field in update_fields])
        values = [data[field] for field in data if field in update_fields]
        values.append(user_id)  # 最后添加user_id用于WHERE条件

        sql = f"UPDATE users SET {set_clause} WHERE id = %s"
        cursor.execute(sql, values)
        connection.commit()

        # 查询更新后的用户信息
        cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
        updated_user = cursor.fetchone()

        return jsonify({'message': '用户更新成功', 'user': updated_user}), 200

    except pymysql.IntegrityError as e:
        connection.rollback()
        return jsonify({'error': '邮箱已存在', 'detail': str(e)}), 409
    except Exception as e:
        if connection:
            connection.rollback()
        app.logger.error(f"更新用户{user_id}失败:{str(e)}")
        return jsonify({'error': '更新用户失败', 'detail': str(e)}), 500
    finally:
        close_db_connection(connection, cursor)

5. 删除用户(Delete)

@app.route('/api/users/<int:user_id>', methods=['DELETE'])
def delete_user(user_id):
    connection = None
    cursor = None
    try:
        connection = get_db_connection()
        if not connection:
            return jsonify({'error': '数据库连接失败'}), 500

        cursor = connection.cursor()
        # 检查用户是否存在
        cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
        if not cursor.fetchone():
            return jsonify({'error': '用户不存在'}), 404

        # 执行删除SQL
        cursor.execute("DELETE FROM users WHERE id = %s", (user_id,))
        connection.commit()

        return jsonify({'message': f'用户{user_id}删除成功'}), 200

    except Exception as e:
        if connection:
            connection.rollback()
        app.logger.error(f"删除用户{user_id}失败:{str(e)}")
        return jsonify({'error': '删除用户失败', 'detail': str(e)}), 500
    finally:
        close_db_connection(connection, cursor)

(三)启动应用

app.py末尾添加启动代码:

if __name__ == '__main__':
    # 开发环境使用5000端口,生产环境需修改为80/443或其他端口
    app.run(host='0.0.0.0', port=5000, debug=app.config['DEBUG'])

四、基于Flask-SQLAlchemy实现CRUD API(可选)

Flask-SQLAlchemy是Flask的ORM(对象关系映射)扩展,可将Python类映射到数据库表,简化SQL操作。以下是简化版实现:

(一)初始化SQLAlchemy

修改app.py的初始化部分:

# app.py(Flask-SQLAlchemy版)
from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
from config import config

app = Flask(__name__)
app.config.from_object(config['default'])

# 配置SQLAlchemy连接信息(从config中读取)
app.config['SQLALCHEMY_DATABASE_URI'] = f"mysql+pymysql://{app.config['MYSQL_USER']}:{app.config['MYSQL_PASSWORD']}@{app.config['MYSQL_HOST']}:{app.config['MYSQL_PORT']}/{app.config['MYSQL_DB']}?charset={app.config['MYSQL_CHARSET']}"
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False  # 关闭SQLAlchemy的修改跟踪(提升性能)

# 初始化SQLAlchemy
db = SQLAlchemy(app)

# 定义User模型(映射到users表)
class User(db.Model):
    __tablename__ = 'users'  # 数据库表名
    id = db.Column(db.Integer, primary_key=True, autoincrement=True)
    name = db.Column(db.String(50), nullable=False)
    age = db.Column(db.Integer, nullable=False)
    email = db.Column(db.String(100), nullable=False, unique=True)
    create_time = db.Column(db.TIMESTAMP, default=db.func.current_timestamp())

    # 将模型转换为字典(便于返回JSON)
    def to_dict(self):
        return {
            'id': self.id,
            'name': self.name,
            'age': self.age,
            'email': self.email,
            'create_time': self.create_time.strftime('%Y-%m-%d %H:%M:%S')
        }

(二)简化版CRUD路由示例(以新增和查询为例)

# 新增用户(Flask-SQLAlchemy版)
@app.route('/api/users', methods=['POST'])
def create_user():
    data = request.get_json()
    required_fields = ['name', 'age', 'email']
    if not all(field in data for field in required_fields):
        return jsonify({'error': '缺少必填字段'}), 400

    try:
        # 创建User对象(无需手动写SQL)
        new_user = User(
            name=data['name'],
            age=data['age'],
            email=data['email']
        )
        db.session.add(new_user)  # 添加到会话
        db.session.commit()       # 提交会话(等同于SQL的COMMIT)
        return jsonify({'message': '用户创建成功', 'user': new_user.to_dict()}), 201
    except Exception as e:
       db.session.rollback()  # 回滚事务
        return jsonify({'error': '创建用户失败', 'detail': str(e)}), 500

# 查询所有用户(Flask-SQLAlchemy版)
@app.route('/api/users', methods=['GET'])
def get_all_users():
    try:
        # 等价于SELECT * FROM users ORDER BY create_time DESC
        users = User.query.order_by(User.create_time.desc()).all()
        return jsonify({
            'count': len(users),
            'users': [user.to_dict() for user in users]
        }), 200
    except Exception as e:
        return jsonify({'error': '查询用户失败', 'detail': str(e)}), 500

# 其他路由(查询单个、更新、删除)实现类似,此处省略

五、API测试

完成API开发后,需验证各接口功能是否正常,推荐使用Postman工具,测试步骤如下:

(一)启动应用

运行app.py

python app.py

控制台显示* Running on http://0.0.0.0:5000/表示启动成功。

(二)测试各接口

新增用户

{
    "name": "张三",
    "age": 25,
    "email": "zhangsan@example.com"
}

查询所有用户

查询单个用户

更新用户

{
    "age": 26,
    "email": "zhangsan_update@example.com"
}

删除用户

六、项目优化建议

  1. 添加请求参数验证:使用marshmallow库对请求数据进行类型、格式验证,提升API健壮性。
  2. 实现分页查询:当用户数据量大时,查询所有用户可能导致性能问题,可添加pageper_page参数实现分页。
# 分页查询示例(PyMySQL版)
@app.route('/api/users', methods=['GET'])
def get_all_users():
    page = request.args.get('page', 1, type=int)
    per_page = request.args.get('per_page', 10, type=int)
    offset = (page - 1) * per_page
    
    # 执行带分页的SQL
    cursor.execute("SELECT * FROM users ORDER BY create_time DESC LIMIT %s OFFSET %s", (per_page, offset))
    # ...
  1. 使用数据库连接池:在生产环境中,频繁创建和关闭数据库连接会消耗资源,可使用DBUtils库实现连接池。
  2. 添加日志记录:通过Flask的app.logger记录关键操作和错误信息,便于问题排查。
  3. 实现身份认证:对API添加Token验证(如JWT),限制未授权访问,保护数据安全。

七、完整代码

下面是使用PyMySQL实现的完整app.py代码,可直接运行:

from flask import Flask, request, jsonify
import pymysql

# 初始化Flask应用
app = Flask(__name__)
# 加载配置(默认开发环境)

# 数据库连接函数
def get_db_connection():
    try:
        connection = pymysql.connect(
            host="填写数据库ip",
            port=端口,
            user="用户",
            password="密码",
            db="数据库"
        )
        return connection
    except pymysql.Error as e:
        app.logger.error(f"数据库连接失败:{str(e)}")
        return None

# 关闭数据库连接函数
def close_db_connection(connection, cursor):
    if cursor:
        cursor.close()
    if connection:
        connection.close()

# 新增用户
@app.route('/api/users', methods=['POST'])
def create_user():
    data = request.get_json()
    required_fields = ['name', 'age', 'email']
    if not all(field in data for field in required_fields):
        return jsonify({'error': '缺少必填字段(name/age/email)'}), 400

    connection = None
    cursor = None
    try:
        connection = get_db_connection()
        if not connection:
            return jsonify({'error': '数据库连接失败'}), 500

        cursor = connection.cursor()
        sql = "INSERT INTO users (name, age, email) VALUES (%s, %s, %s)"
        cursor.execute(sql, (data['name'], data['age'], data['email']))
        connection.commit()

        user_id = cursor.lastrowid
        cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
        new_user = cursor.fetchone()

        return jsonify({'message': '用户创建成功', 'user': new_user}), 201

    except pymysql.IntegrityError as e:
        connection.rollback()
        return jsonify({'error': '邮箱已存在', 'detail': str(e)}), 409
    except Exception as e:
        if connection:
            connection.rollback()
        app.logger.error(f"创建用户失败:{str(e)}")
        return jsonify({'error': '创建用户失败', 'detail': str(e)}), 500
    finally:
        close_db_connection(connection, cursor)

# 查询所有用户
@app.route('/api/users', methods=['GET'])
def get_all_users():
    connection = None
    cursor = None
    try:
        connection = get_db_connection()
        if not connection:
            return jsonify({'error': '数据库连接失败'}), 500

        cursor = connection.cursor()
        cursor.execute("SELECT * FROM users ORDER BY create_time DESC")
        users = cursor.fetchall()

        return jsonify({'count': len(users), 'users': users}), 200

    except Exception as e:
        app.logger.error(f"查询所有用户失败:{str(e)}")
        return jsonify({'error': '查询用户失败', 'detail': str(e)}), 500
    finally:
        close_db_connection(connection, cursor)

# 查询单个用户
@app.route('/api/users/<int:user_id>', methods=['GET'])
def get_single_user(user_id):
    connection = None
    cursor = None
    try:
        connection = get_db_connection()
        if not connection:
            return jsonify({'error': '数据库连接失败'}), 500

        cursor = connection.cursor()
        cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
        user = cursor.fetchone()

        if not user:
            return jsonify({'error': '用户不存在'}), 404

        return jsonify({'user': user}), 200

    except Exception as e:
        app.logger.error(f"查询用户{user_id}失败:{str(e)}")
        return jsonify({'error': '查询用户失败', 'detail': str(e)}), 500
    finally:
        close_db_connection(connection, cursor)

# 更新用户
@app.route('/api/users/<int:user_id>', methods=['PUT'])
def update_user(user_id):
    data = request.get_json()
    update_fields = ['name', 'age', 'email']
    if not any(field in data for field in update_fields):
        return jsonify({'error': '需提供至少一个更新字段(name/age/email)'}), 400

    connection = None
    cursor = None
    try:
        connection = get_db_connection()
        if not connection:
            return jsonify({'error': '数据库连接失败'}), 500

        cursor = connection.cursor()
        cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
        if not cursor.fetchone():
            return jsonify({'error': '用户不存在'}), 404

        set_clause = ", ".join([f"{field} = %s" for field in data if field in update_fields])
        values = [data[field] for field in data if field in update_fields]
        values.append(user_id)

        sql = f"UPDATE users SET {set_clause} WHERE id = %s"
        cursor.execute(sql, values)
        connection.commit()

        cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
        updated_user = cursor.fetchone()

        return jsonify({'message': '用户更新成功', 'user': updated_user}), 200

    except pymysql.IntegrityError as e:
        connection.rollback()
        return jsonify({'error': '邮箱已存在', 'detail': str(e)}), 409
    except Exception as e:
        if connection:
            connection.rollback()
        app.logger.error(f"更新用户{user_id}失败:{str(e)}")
        return jsonify({'error': '更新用户失败', 'detail': str(e)}), 500
    finally:
        close_db_connection(connection, cursor)

# 删除用户
@app.route('/api/users/<int:user_id>', methods=['DELETE'])
def delete_user(user_id):
    connection = None
    cursor = None
    try:
        connection = get_db_connection()
        if not connection:
            return jsonify({'error': '数据库连接失败'}), 500

        cursor = connection.cursor()
        cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
        if not cursor.fetchone():
            return jsonify({'error': '用户不存在'}), 404

        cursor.execute("DELETE FROM users WHERE id = %s", (user_id,))
        connection.commit()

        return jsonify({'message': f'用户{user_id}删除成功'}), 200

    except Exception as e:
        if connection:
            connection.rollback()
        app.logger.error(f"删除用户{user_id}失败:{str(e)}")
        return jsonify({'error': '删除用户失败', 'detail': str(e)}), 500
    finally:
        close_db_connection(connection, cursor)

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000, debug=app.config['DEBUG'])

地址返回: http://127.0.0.1:5000/api/users

八、总结

本文详细介绍了使用Flask和PyMySQL实现MySQL数据库增删改查API的全过程,包括环境搭建、数据库设计、API开发、测试及优化建议。通过本文的学习,你可以掌握:

  1. Flask框架基本使用方法,包括路由定义、请求处理、响应返回。
  2. PyMySQL库连接和操作MySQL数据库的技巧,如SQL执行、事务处理、异常捕获。
  3. RESTful API设计规范,如HTTP方法与CRUD操作的对应关系、状态码使用。
  4. 项目优化思路,提升API的性能、安全性和可维护性。

后续可进一步扩展功能,如添加用户认证、实现更复杂的查询逻辑、集成前端页面等,逐步完善项目。

以上就是利用Flask实现MySQL数据库增删改查API的完整代码的详细内容,更多关于Flask实现MySQL增删改查API的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文