Python Flask框架操作数据库的方法
作者:程序猿知秋
简介
SQLAlchamy 是 Python 中一个通过 ORM 操作数据库的框架
SQLAlchemy(对象关系映射器)提供了一种方法,用于将用户定义的Python类与数据库表相关联,并将这些类(对象)的实例与其对应表中的行相关联。
简单理解: 创建一个类,一个类对应了一个数据库中的一张表,类的属性名对应了表中的字段名,这个类称为映射类
SQLAlchemy本身无法操作数据库,其必须使用 pymsql 等第三方插件,从而实现对数据库的操作,如:mysql数据库
mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
使用
flask_sqlalchamy
Flask 中最方便用的数据库框架是 flask_sqlalchamy,是对 SQLAlchamy 在 Flask 中的扩展,它主要在于简化Flask 中 sqlalchamy的使用
安装
pip install flask_sqlalchemy
初始化
为了方便管理数据库相关的配置项,flask_sqlalchamy 中采用app.config进行相关属性配置
配置变量的名称必须使用大写,写入配置的语句一般会放到扩展类实例化语句之前
配置项 | 说明 |
SQLALCHEMY_DATABASE_URI | 数据库的链接地址 |
SQLALCHEMY_BINDS | 访问多个数据库时,用于设置数据库链接池地址 |
SQLALCHEMY_ECHO | 是否打印底层执行的SQL语句 |
SQLALCHEMY_RECORD_QUERIES | 是否记录执行的查询语句,用于慢查询分析 |
SQLALCHEMY_TRACK_MODIFICATIONS | 是否追踪数据库变化 |
SQLALCHEMY_ENGINE_OPTIONS | 设置针对 sqlalchemy本体的配置项 |
设置数据库链接
from flask import Flask from flask_sqlalchemy import SQLAlchemy app = Flask(__name__) # 设置数据库链接地址 app.config['SQLALCHEMY_DATABASE_URI']='mysql+pymysql://root:123456@127.0.0.1:3306/test' # 设置显示底层执行的sql语句 app.config['SQLALCHEMY_ECHO'] = True # 初始化组件对象,关联flask应用 db = SQLAlchemy(app)
构建模型
模型简单来说就是数据库中的一张表定义,需要有名称,字段,在 Python 中用一个类来表示,由于需要和数据库的表对应,模型必须继承自 SQLAlchamy 的 Model 类
from flask import Flask from flask_sqlalchemy import SQLAlchemy import datetime app = Flask(__name__) # 设置数据库链接地址 app.config['SQLALCHEMY_DATABASE_URI']='mysql+pymysql://root:123456@127.0.0.1:3306/test' # 设置显示底层执行的sql语句 app.config['SQLALCHEMY_ECHO'] = True # 初始化组件对象,关联flask应用 db = SQLAlchemy(app) # 构建模型类 class User(db.Model): # 设置表名,表名默认为类名小写 __tablename__ = 'user' id = db.Column(db.Integer,primary_key=True) # 设置主键,默认自增 name = db.Column(db.String(20)) age = db.Column(db.Integer,default=20) #设置默认值 createtime=db.Column(db.DateTime()) def __repr__(self): # 自定义 交互模式 & print() 的对象打印 return "(%s, %s, %s, %s)" % (self.id, self.name, self.age,self.createtime) if __name__ == '__main__': app.run(debug=True)
__repr__
方法定义了一个对象的比较易读的显式方式
常用字段类
字段类型 | 说明 |
---|---|
db.Integer | 整数 |
db.String(size) | 字符串 size 为字符串长度 |
db.DateTime | 日期时间 |
db.Date | 日期 |
db.Text | 长文本,可以存放 CLOB (二进制数据) |
db.Float | 浮点数字 |
db.Boolean | 布尔值 |
常用字段选项
字段类型 | 说明 |
---|---|
primary_key | 设置表的主键,默认自增 |
unique | 设置唯一索引 |
nullable | 非空约束 |
default | 设置默认值 |
index | 创建索引 |
创建库表
@app.route("/initDB") def init_db(): # 删除所有继承睡在db.Model的表 db.drop_all() # 创建所有继承自 db.Model的表 db.create_all() return "initdb"
注:db.drop_all() 和 db.create_all() 放在 main中不启作用,所以单独写一个方法进行初始化。
一般库表的创建与应用都是分开的,单独创建库表,没必要将此类语句写在应用中
数据操作
增加数据
@app.route("/add") def add(): # 添加数据 user1 = User(name='zhao', age=33, createtime=datetime.datetime.now()) # 将模型添加到会话中 db.session.add(user1) # 添加多条记录 # db.session.add_all([user1,user2,user3]) # 提交会话, sqlalchemy会自动创建隐式事务,失败会自动回滚 db.session.commit() return "add"
注:
- sqlalchemy会自动创建事务,并将数据的操作都包含在一个事务中,提交会话就会提交事务
- session 是一个与数据库通信的会话,是 SQLAlchamy 框架与数据库交互的代理
- 可以调用 session.rollback() 回滚掉未提交的变化
查询数据
简单查询
@app.route("/query") def query(): # 查询所有用户数据, 返回User列表, data = User.query.all() for item in data: print(item.name,item.age) # 查询有多少个用户,返回结果数 data = User.query.count() print(data) # 查询第1个用户 , 返回User对象 data = User.query.first() print(data) # 查询id为4的用户, 返回User对象 data = User.query.get(4) print(data) return "query"
条件过滤查询
@app.route("/query2") def query2(): # 查询 name 等于 zhao 的记录集中第一条记录 data = User.query.filter(User.name =='zhao').first() # 查询 name 不等于 zhao 的所有记录 data = User.query.filter(User.name != 'zhao').all() # 前模糊查询 data = User.query.filter(User.name.like("%zhao")).all() print(data) # 查询名字以 zhao 开头的用户 data = User.query.filter(User.name.startswith('zhao')).all() # 查询名字是 zhao,并且年龄是22岁的用户 from sqlalchemy import and_ data = User.query.filter(and_(User.name=='zhao', User.age==22)).all() # 查询名字是 zhao或者hong的用户 from sqlalchemy import or_ data = User.query.filter(or_(User.name=='zhao', User.name=='hong')).all() # 查询id为[1, 2, 3]的用户 data = User.query.filter(User.id.in_([1, 2, 3])).all() # 分页查询 # 所有用户先按年龄从小到大, 再按id从大到小排序, 取前5个 data = User.query.order_by(User.age, User.id.desc()).limit(5).all() # 分页查询, 每页1个, 查询第2页的数据 paginate(页码, 每页条数) pn = User.query.paginate(page=2, per_page=1) print("总页数:"+str(pn.pages)+" 当前页码:"+str(pn.page)+"当前页的数据:"+ str(pn.items)+" 总条数:"+str(pn.total)) # 分组查询 # 查询每个年龄的人数 select age, count(name) from t_user group by age 分组聚合 from sqlalchemy import func data = db.session.query(User.age, func.count(User.id).label("count")).group_by(User.age).all() return "query2"
常用查询方法
方法名称 | 说明 |
---|---|
all() | 返回所有查询记录的列表 |
first() | 返回查询的第一条记录,如果未找到,则返回None |
get(id) | 传入主键值作为参数,返回指定主键值的记录,如果未找到,则返回None |
count() | 返回查询结果的数量 |
paginate() | 返回一个Pagination对象,可以对记录进行分页处理 |
更新数据
@app.route("/update") def update(): # 更新用户年龄为44,条件是:用户名=zhao # 相当于:update user set age = 44 where name = 'zhao'; User.query.filter(User.name == 'zhao').update({'age': 44}) # 提交会话 db.session.commit() return "update"
删除数据
@app.route("/del") def delete(): # 删除id=1的用户 # 相当于:delete from user where id=1 User.query.filter(User.id == 1).delete() # 提交会话 db.session.commit() return "del"
多表关联查询
连接查询
开发中有 联表查询需求 时, 一般会使用 join连接查询
db.session.query(主表模型字段1, 主表模型字段2, 从表模型字段1, xx.. ).join(从表模型类, 主表模型类.主键 == 从表模型类.外键)
示例
from flask import Flask from flask_sqlalchemy import SQLAlchemy import datetime app = Flask(__name__) # 设置数据库链接地址 app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:123456@127.0.0.1:3306/test' # 设置显示底层执行的sql语句 app.config['SQLALCHEMY_ECHO'] = True # 初始化组件对象,关联flask应用 db = SQLAlchemy(app) # 构建模型类 class User(db.Model): # 设置表名,表名默认为类名小写 __tablename__ = 'user' id = db.Column(db.Integer, primary_key=True) # 设置主键,默认自增 name = db.Column(db.String(20)) age = db.Column(db.Integer, default=20) # 设置默认值 createtime = db.Column(db.DateTime()) def __repr__(self): # 自定义 交互模式 & print() 的对象打印 return "(%s, %s, %s, %s)" % (self.id, self.name, self.age, self.createtime) class Address(db.Model): __tablename__ = 'address' id = db.Column(db.Integer, primary_key=True) detail = db.Column(db.String(20)) user_id = db.Column(db.Integer) # 定义外键 if __name__ == '__main__': app.run(debug=True) @app.route("/") def index(): return "index" @app.route("/query") def query(): """ 查询多表数据 需求: 查询姓名为"hong" 的用户id和地址信息""" # sqlalchemy的join查询 # 相当于:SELECT user.id AS user_id, address.detail AS address_detail # FROM user INNER JOIN address ON user.id = address.user_id data = db.session.query(User.id, Address.detail).join(Address, User.id == Address.user_id).filter( User.name == 'hong').all() for item in data: print(item.detail, item.id) return "query"
到此这篇关于Python Flask的数据库操作的文章就介绍到这了,更多相关Python Flask操作数据库内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!