微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

SQLAlchemy 异步操作

在现代的 Python 应用中,异步操作越来越受到重视,特别是在处理 I/O 密集型任务时。sqlAlchemy 也提供了对异步操作的支持,结合 aioMysqL 可以实现异步的数据库操作。

环境准备

首先,你需要安装 aioMysqL

pip install aioMysqL

步骤详解

1. 创建数据库驱动引擎

我们需要创建一个异步的数据库引擎。这里我们使用 aioMysqL 作为 MysqL 的异步驱动。

from sqlalchemy.ext.asyncio import create_async_engine

engine = create_async_engine(
    url='MysqL+aioMysqL://root:0908@localhost:3306/db_sqlalchemy_demo?charset=utf8mb4',
    echo=True,
    pool_size=10,
    max_overflow=30,
    pool_recycle=60 * 30,
)

2. 创建异步会话类

使用 sessionmaker 创建一个异步会话类。

from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import sessionmaker

async_session = sessionmaker(
    engine, expire_on_commit=False, class_=AsyncSession
)

3. 创建模型基类

定义一个模型基类,所有的模型都将继承自这个基类。

from sqlalchemy.orm import DeclarativeBase

class Model(DeclarativeBase):
    pass

4. 创建模型

定义一个 Student 模型,包含学生的相关信息。

from sqlalchemy import Column, Integer, String, Boolean, SmallInteger, Text, DateTime
import datetime

class Student(Model):
    __tablename__ = "tb_async_student"
    id = Column(Integer, primary_key=True)
    name = Column(String(20))
    sex = Column(Boolean, default=True)
    age = Column(SmallInteger)
    class_ = Column("class", SmallInteger)
    description = Column(Text)
    status = Column(Boolean, default=1)
    addtime = Column(DateTime, default=datetime.datetime.Now)
    orders = Column(SmallInteger, default=1)

    def __repr__(self):
        return f"<{self.__class__.__name__} {self.name}>"

    def to_dict(self):
        return {
            'id': self.id,
            'name': self.name,
            'sex': self.sex,
            'age': self.age,
            'class': self.class_,
            'description': self.description,
            'status': self.status,
            'addtime': self.addtime.strftime('%Y-%m-%d %H:%M:%s'),
            'orders': self.orders,
        }

5. 异步操作数据库

接下来,我们将展示如何进行异步的数据库操作,包括创建表、插入数据和查询数据。

import asyncio
from sqlalchemy import select

async def main():
    # 创建表
    async with engine.begin() as conn:
        await conn.run_sync(Model.Metadata.drop_all)
        await conn.run_sync(Model.Metadata.create_all)

    # 插入数据
    async with async_session() as session:
        async with session.begin():
            student1 = Student(name="小明", class_="302", sex=True, age=18, description="滚出去..")
            student2 = Student(name="小蓝", class_="303", sex=True, age=18, description="滚出去..") 
            student3 = Student(name="小非", class_="305", sex=True, age=18, description="滚出去..")
            student4 = Student(name="小富", class_="305", sex=True, age=18, description="滚出去..")
            # 添加一条数据
            session.add(student1)
            # 添加多条数据
            session.add_all([student2, student3, student4])

    # 查询数据
    async with async_session() as session:
        async with session.begin():
            q = select(Student).where(Student.class_ == 305).order_by(Student.id)
            print(q)
            # 获取一个结果
            result = await session.execute(q)
            student = result.scalar()
            if student:
                print(student.to_dict())
            # 获取多个结果
            result = await session.execute(q)
            students = result.scalars().all()
            for student in students:
                print(student.to_dict())

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

本篇结束,本合集到此为止,很多用法都说得比较简单,但是并没有讲述诸多的实现原理,作者对其原理也是一知半解,无法再深入去分享相关的言谈,毕竟仅只是一篇入门合集,再加上笔者精力时间有限,写下来也是方便日后的使用查阅。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐