python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python sqlalchemy操作数据库

Python中使用sqlalchemy操作数据库的问题总结

作者:伍华聪

在探索使用 FastAPI, SQLAlchemy, Pydantic,Redis, JWT 构建的项目的时候,其中数据库访问采用SQLAlchemy,并采用异步方式,这篇文章主要介绍了在Python中使用sqlalchemy来操作数据库的几个小总结,需要的朋友可以参考下

在探索使用 FastAPI, SQLAlchemy, Pydantic,Redis, JWT 构建的项目的时候,其中数据库访问采用SQLAlchemy,并采用异步方式。数据库操作和控制器操作,采用基类继承的方式减少重复代码,提高代码复用性。在这个过程中设计接口和测试的时候,对一些问题进行跟踪解决,并记录供参考。

1、SQLAlchemy事务处理

在异步环境中,批量更新操作需要使用异步方法来执行查询和提交事务。

async def update_range(self, obj_in_list: List[DtoType], db: AsyncSession) -> bool:
    """批量更新对象"""
    try:
        async with db.begin():  # 使用事务块确保批量操作的一致性
            for obj_in in obj_in_list:
                # 查询对象
                query = select(self.model).filter(self.model.id == obj_in.id)
                result = await db.execute(query)
                db_obj = result.scalars().first()
                if db_obj:
                    # 获取更新数据
                    update_data = obj_in.model_dump(skip_defaults=True)
                    # 更新对象字段
                    for field, value in update_data.items():
                        setattr(db_obj, field, value)
        return True
    except SQLAlchemyError as e:
        print(e)
        # 异常处理时,事务会自动回滚
        return False

在这个改进后的代码中:

这种方式确保了在异步环境中批量更新操作的正确性和一致性。

在使用 async with db.begin() 进行事务管理时,事务会自动提交。如果在事务块内执行的所有操作都成功,事务会在退出时自动提交;如果出现异常,事务会自动回滚。

因此,手动调用 await db.commit() 是不必要的,因为事务块会处理这些操作。如果你不使用事务块,并希望手动控制事务的提交,可以如下修改:

async def update_range(self, obj_in_list: List[DtoType], db: AsyncSession) -> bool:
    """批量更新对象"""
    try:
        for obj_in in obj_in_list:
            query = select(self.model).filter(self.model.id == obj_in.id)
            result = await db.execute(query)
            db_obj = result.scalars().first()
            if db_obj:
                update_data = obj_in.model_dump(skip_defaults=True)
                for field, value in update_data.items():
                    setattr(db_obj, field, value)
        await db.commit()  # 手动提交事务
        return True
    except SQLAlchemyError as e:
        print(e)
        await db.rollback()  # 确保在出错时回滚事务
        return False

在这个手动提交事务的例子中:

根据需求选择合适的方法进行事务管理。事务块方式通常是更安全和简洁的选择。

 在异步环境中,create_update 方法需要对数据库进行异步查询、更新或创建操作。

async def create_update(
    self, obj_in: DtoType, id: PrimaryKeyType, db: AsyncSession
) -> bool:
    """创建或更新对象"""
    try:
        # 查询对象
        query = select(self.model).filter(self.model.id == id)
        result = await db.execute(query)
        db_obj = result.scalars().first()
        if db_obj:
            # 更新对象
            return await self.update(obj_in, db)
        else:
            # 创建对象
            return await self.create(obj_in, db)
    except SQLAlchemyError as e:
        print(e)
        # 确保在出错时回滚事务
        await db.rollback()
        return False

在这个代码中:

在异步环境中,批量插入对象通常需要使用异步方法来执行数据库操作。由于 bulk_insert_mappings 在 SQLAlchemy 的异步版本中可能不直接支持,你可以使用 add_all 方法来批量添加对象。

async def save_import(self, data: List[DtoType], db: AsyncSession) -> bool:
    """批量导入对象"""
    try:
        # 将 DTO 转换为模型实例
        db_objs = [self.model(**obj_in.model_dump()) for obj_in in data]
        # 批量添加对象
        db.add_all(db_objs)
        # 提交事务
        await db.commit()
        return True
    except SQLAlchemyError as e:
        print(e)
        await db.rollback()  # 确保在出错时回滚事务
        return False

代码说明:

这种方式确保了在异步环境中正确地进行批量导入操作,并处理可能出现的异常。

2、在 SQLAlchemy 中select(...).where(...) 和 select(...).filter(...)的差异

在 SQLAlchemy 中,select(...).where(...) 和 select(...).filter(...) 都用于构造查询条件,但它们有一些细微的差别和适用场景。

1. where(...)

2. filter(...)

主要差异

使用 where 的示例(SQLAlchemy Core):

from sqlalchemy.future import select
from sqlalchemy.ext.asyncio import AsyncSession
async def get(self, id: int, db: AsyncSession) -> Optional[ModelType]:
    query = select(self.model).where(self.model.id == id)
    result = await db.execute(query)
    return result.scalars().first()

使用 filter 的示例(SQLAlchemy ORM):

from sqlalchemy.orm import sessionmaker
async def get(self, id: int, db: AsyncSession) -> Optional[ModelType]:
    query = select(self.model).filter(self.model.id == id)
    result = await db.execute(query)
    return result.scalars().first()

总结

在 SQLAlchemy 2.0 及更高版本中,select 的 where 和 filter 的用法变得越来越一致,你可以根据自己的习惯和需求选择其中一种。在实际开发中,选择哪一种方法通常取决于你的代码上下文和个人偏好。

3、model_dump(exclude_unset=True) 和model_dump(skip_defaults=True)有什么差异

model_dump(exclude_unset=True) 和 model_dump(skip_defaults=True) 是用于处理模型实例的序列化方法,它们的用途和行为略有不同。这两个方法通常用于将模型实例转换为字典,以便进行进一步的处理或传输。

model_dump(exclude_unset=True)

exclude_unset=True 是一个选项,通常用于序列化方法中,表示在转换模型实例为字典时,排除那些未设置的字段。

# 假设模型有字段 'name' 和 'age',且 'age' 使用了默认值
model_instance = MyModel(name='Alice', age=25)
# 如果 age 的默认值是 0, exclude_unset=True 将只包含 'name'
serialized_data = model_instance.model_dump(exclude_unset=True)

model_dump(skip_defaults=True)

skip_defaults=True 是另一个选项,表示在转换模型实例为字典时,排除那些使用了默认值的字段。

# 假设模型有字段 'name' 和 'age',且 'age' 使用了默认值
model_instance = MyModel(name='Alice', age=25)
# 如果 age 的默认值是 0, skip_defaults=True 将只包含 'name'
serialized_data = model_instance.model_dump(skip_defaults=True)

主要区别

4、使用**kwargs 参数,在接口中实现数据软删除的处理

例如我们在删除接口中,如果传递了 kwargs 参数,则进行软删除(更新记录),否则进行硬删除(删除记录)。

async def delete_byid(self, id: PrimaryKeyType, db: AsyncSession, **kwargs) -> bool:
        """根据主键删除一个对象
        :param kwargs: for soft deletion only
        """
        if not kwargs:
            result = await db.execute(sa_delete(self.model).where(self.model.id == id))
        else:
            result = await db.execute(
                sa_update(self.model).where(self.model.id == id)<strong>.values(**</strong><strong>kwargs)</strong>
            )
        await db.commit()
        return result.rowcount > 0

实例代码如下所示。

# 示例模型
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Boolean
Base = declarative_base()
class Customer(Base):
    __tablename__ = 'customer'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    is_deleted = Column(Boolean, default=False)
# 示例使用
async def main():
    async with AsyncSession(engine) as session:
        controller = BaseController(Customer)
        # 硬删除
        result = await controller.delete_byid(1, session)
        print(f"Hard delete successful: {result}")
        # 软删除
        result = await controller.delete_byid(2, session, is_deleted=True)
        print(f"Soft delete successful: {result}")
# 确保运行主程序
import asyncio
if __name__ == "__main__":
    asyncio.run(main())

注意事项

# 示例硬删除调用
await controller.delete_byid(1, session)
# 示例软删除调用
await controller.delete_byid(2, session, is_deleted=True)

如果我们的is_deleted 字段是Int类型的,如下所示,那么处理有所不同

class Customer(Base):
    __tablename__ = "t_customer"
    id = Column(String, primary_key=True, comment="主键")
    name = Column(String, comment="姓名")
    age = Column(Integer, comment="年龄")
    creator = Column(String, comment="创建人")
    createtime = Column(DateTime, comment="创建时间")
    is_deleted = Column(Integer, comment="是否删除")

操作代码

# 硬删除
        result = await controller.delete_byid("1", session)
        print(f"Hard delete successful: {result}")
        # 软删除
        result = await controller.delete_byid("2", session, <strong>is_deleted=1</strong>)
        print(f"Soft delete successful: {result}")

注意事项

通过确保正确传递参数并且模型包含正确的字段,你应该能够正确执行软删除和硬删除操作。

5、Python处理接口的时候,Iterable 和List有什么差异

在 Python 中,Iterable 和 List 是两个不同的概念,它们有各自的特点和用途:

Iterable

Iterable 是一个更广泛的概念,指的是任何可以返回一个迭代器的对象。迭代器是一个实现了 __iter__() 方法的对象,能够逐个返回元素。几乎所有的容器类型(如列表、元组、字典、集合等)都是可迭代的。要检查一个对象是否是可迭代的,可以使用 collections.abc.Iterable 来进行检查。

特点

from collections.abc import Iterable
print(isinstance([1, 2, 3], Iterable))  # True
print(isinstance((1, 2, 3), Iterable))  # True
print(isinstance({1, 2, 3}, Iterable))  # True
print(isinstance({'a': 1}, Iterable))   # True
print(isinstance((x for x in range(3)), Iterable))  # True

List

List 是 Python 中的一种具体的容器类型,表示一个有序的元素集合,可以包含重复的元素。它是最常用的可变序列类型之一,支持索引访问、切片操作以及其他多种方法来操作列表中的元素。

特点

my_list = [1, 2, 3]
print(my_list)  # [1, 2, 3]
my_list.append(4)  # [1, 2, 3, 4]
my_list[0] = 10  # [10, 2, 3, 4]

总结一下:

Iterable 是一个抽象概念,而 List 是一个具体的实现。你可以在 List 之上使用许多操作和方法来处理数据,而 Iterable 主要关注的是是否可以进行迭代。

因此接收结合的处理,我们可以使用Iterable接口更加通用一些。

async def create_range(
        self, obj_in_list: Iterable[DtoType], db: AsyncSession
    ) -> bool:
        """批量创建对象"""
        try:
            # 将 DTO 转换为模型实例
            db_objs = [self.model(**obj_in.model_dump()) for obj_in in obj_in_list]
            # 批量添加到数据库
            db.add_all(db_objs)
            await db.commit()
            return True
        except SQLAlchemyError as e:
            print(e)
            await db.rollback()  # 确保在出错时回滚事务
            return False

以上就是在Python中使用sqlalchemy来操作数据库的时候,对一些小问题的总结,供大家参考。

到此这篇关于在Python中使用sqlalchemy来操作数据库的几个小总结的文章就介绍到这了,更多相关Python sqlalchemy操作数据库内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文