今日内容概要
- sqlalchemy介绍和快速使用
- 单表操作增删查改
- 一对多
- 多对多
- flask集成
内容详细
1、sqlalchemy介绍和快速使用
# SQLAlchemy是一个基于 Python实现的ORM框架 # django的orm框架---》只能在django中用,不能单独用 # SQLAlchemy单独的,可以集成到任意框架中 # peewee:轻量级 # python的异步orm框架不多, sanic, fastapi---》一旦用了异步,后续所有都需要用异步---》操作mysql,aiomysql--》操作redis,使用aioredis # 公司选择 -第一:peewee-async -第二:框架是异步---》没有使用异步orm框架---》SQLAlchemy---》生成和迁移表---》查询操作数据用原生操作 # 写django项目---》库和表已经有了 -正常操作django中建表模型---》迁移---》表 -反向生成models--》表---》models.py----》改表---》再反向生成 python manage.py inspectdb > app/models.py
1.1 执行原生sql
# 执行原生sql快速使用 import time import threading import sqlalchemy from sqlalchemy import create_engine from sqlalchemy.engine.base import Engine # 第一步:创建engine engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/db01?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) # 第二步:使用 def task(): conn = engine.raw_connection() # 从连接池中取一个连接 cursor = conn.cursor() sql = "select * from cmd" cursor.execute(sql) print(cursor.fetchall()) if __name__ == '__main__': for i in range(20): t = threading.Thread(target=task) t.start() # 查询mysql的客户端连接数
2、单表操作增删查改
2.1 表迁移
# 不能创建数据库(django orm也不能) # 只能做表的创建和删除,不能做表更改(django orm能)---》借助于第三方实现 ###### 第一步:生成基类,所有表模型都要继承这个基类 django 的orm继承一个父类,Base就是那个父类 ###### 第二步:写表模型,继承父类,写字段 (注意区别于django 的orm) django的default--》可不可以传个函数内存地址---》插入的时候通过函数运算完得到的值 ###### 第三步:迁移,通过表模型,生成表
创建models.py
import datetime from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index # 第一步:生成基类,所有表模型都要继承这个基类 # django 的orm继承一个父类,Base就是那个父类 Base = declarative_base() # 第二步:写表模型,继承父类,写字段 (注意区别于django 的orm) # django的default--》可不可以传个函数内存地址---》插入的时候通过函数运算完得到的值 class Users(Base): id = Column(Integer, primary_key=True, autoincrement=True) # id 主键 name = Column(String(32), index=True, nullable=False) # name列,索引,不可为空 email = Column(String(32), unique=True) # 唯一 # datetime.datetime.now不能加括号,加了括号,以后永远是当前时间 ctime = Column(DateTime, default=datetime.datetime.now) # 默认值 extra = Column(Text, nullable=True) # 大文本,可以为空 __tablename__ = 'lqz_users' # 数据库表名称,如果不写,就报错 # __table_args__ = ( # UniqueConstraint('id', 'name', name='uix_id_name'), # 联合唯一 # Index('ix_id_name', 'name', 'email'), # 联合索引 # ) # 聚簇索(mysql主键自动建索引,聚簇索引,mysql基于聚簇索引构建的B+树),一定会有,没有显示建主键,mysql会隐藏一个 # 辅助索引:手动建的叫辅助索引---》单独减了索引---》如果你的辅助索引过多,非常影响插入效率,适度建索引
创建演示文件:
from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from threading import Thread from models import Base # 第三步:迁移,通过表模型,生成表 engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/db01?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) def create_table(): # 通过engine这个连接配置,创建出所有使用Base管理的表 Base.metadata.create_all(engine) def delete_table(): # 通过engine这个连接配置,删除出所有使用Base管理的表 Base.metadata.drop_all(engine) if __name__ == '__main__': # create_table() # 创建表 delete_table() # 删除表
2.2 简单的表操作
### 操作表,增加一条记录,以后都用conn/session(命名可以更改)操作 # 第一步:创建engin # 第二步:通过session得到连接对象 Session = sessionmaker(bind=engine) session = Session() # # 第三步:实例化得到模型类的对象,增加到数据库中 usr=Users(name='lqz001') session.add(usr) # # 第四步:提交事务 session.commit()
2.3 基于scoped_session实现线程安全
# # 以后操作数据,都用session对象---》定义在flask的函数外部还是内部? # # 放内部没问题,每次都生成一个新的session,耗费资源 # # 如果定义在函数外部,会存在 多线程并发使用同一个变量session,要把session做成并发安全的 Session = sessionmaker(bind=engine) session = scoped_session(Session) # 也是基于local,给每一个线程自己创造一个session # # 只需要记住,如果是多线程使用,或者在web框架中,使用scoped_session生成session就可以了 # # 集成到flask中,有flask-sqlalchemy第三方,内部已经处理了scoped_session # # 全局用这个一个session,不用担心并发不安全 usr = Users(name='lqz002') session.add(usr) # 线程一用:取local中取线程1的那个session,如果就给,没有就重新创造一个 # # 第四步:提交事务 session.commit()
测试线程安全
# 线程一用: 取local中取线程1的那个session,如果就给,没有就重新创造一个 # 线程二用: 取local中取线程2的那个session,如果就给,没有就重新创造一个 # # 测试:开3个线程,如果定义全局的session,在3个线程中用,session对象应该是同一个 Session = sessionmaker(bind=engine) session = Session() # session = scoped_session(Session) def task(): # usr = Users(name='lqz003') # session.add(usr) # session.commit() # print(session.registry.registry.value) # <sqlalchemy.orm.scoping.scoped_session object at 0x7f8fbceeea60> print(session) # <sqlalchemy.orm.scoping.scoped_session object at 0x7f8fbceeea60> # 开3个线程,如果定义scoped_session,在3个线程中用,session对象应该是不是同一个,独有的 if __name__ == '__main__': for i in range(3): t = Thread(target=task) t.start()
2.4 基本增删查改
from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from models import Users from sqlalchemy.orm import scoped_session from models import Base # 第三步:迁移,通过表模型,生成表 engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/db01?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) def create_table(): # 通过engine这个连接配置,创建出所有使用Base管理的表 Base.metadata.create_all(engine) def delete_table(): # 通过engine这个连接配置,删除出所有使用Base管理的表 Base.metadata.drop_all(engine) if __name__ == '__main__': # create_table() # delete_table() Session = sessionmaker(bind=engine) session = scoped_session(Session) ### 1 增加操作 # 增加一个 obj1 = Users(name="lqz003") session.add(obj1) # 增加多个,不同对象 session.add_all([ Users(name="lqz009"), Users(name="lqz008"), ]) session.commit() # 2 删除操作---》查出来再删---》 session.query(Users).filter(Users.id > 2).delete() session.commit() # 3 修改操作--》查出来改 # 传字典 session.query(Users).filter(Users.id > 0).update({"name": "lqz"}) # # 类似于django的F查询 # # 字符串加 # session.query(Users).filter(Users.id > 0).update({Users.name: Users.name + "099"}, synchronize_session=False) # # 数字加 # session.query(Users).filter(Users.id > 0).update({"age": Users.age + 1}, synchronize_session="evaluate") session.commit() # 4 查询操作----》 r1 = session.query(Users).all() # 查询所有 # 只取age列,把name重命名为xx # 原生sql:select name as xx,age from user; # r2 = session.query(Users.name.label('xx'), Users.age).all() # # filter传的是表达式,filter_by传的是参数 # r3 = session.query(Users).filter(Users.name == "lqz").all() # # r3 = session.query(Users).filter(Users.id >= 1).all() # r4 = session.query(Users).filter_by(name='lqz').all() # r5 = session.query(Users).filter_by(name='lqz').first() # :value 和:name 相当于占位符,用params传参数 # r6 = session.query(Users).filter(text("id<:value and name=:name")).params(value=224, name='lqz').order_by( # Users.id).all() # 自定义查询sql # r7 = session.query(Users).from_statement(text("SELECT * FROM users where name=:name")).params(name='lqz').all()
2.5 更多查询操作
# 更多查询 # 条件 # select * form user where name =lqz # ret = session.query(Users).filter_by(name='lqz').all() # 表达式,and条件连接 # select * from user where id >1 and name = lqz # ret = session.query(Users).filter(Users.id > 1, Users.name == 'lqz').all() # select * from user where id between 1,3 and name = lqz # ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'lqz').all() # 注意下划线 # select * from user where id in (1,3,4) # ret = session.query(Users).filter(Users.id.in_([1, 3, 4])).all() # # ~非,除。。外 # select * from user where id not in (1,3,4) # ret = session.query(Users).filter(~Users.id.in_([1, 3, 4])).all() # # # 二次筛选 # # ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='lqz'))).all() # from sqlalchemy import and_, or_ # # # # or_包裹的都是or条件,and_包裹的都是and条件 # ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all() # ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all() # ret = session.query(Users).filter( # or_( # Users.id < 2, # and_(Users.name == 'eric', Users.id > 3), # Users.extra != "" # )).all() # # 通配符,以e开头,不以e开头 # ret = session.query(Users).filter(Users.name.like('e%')).all() # ret = session.query(Users).filter(~Users.name.like('e%')).all() # # 限制,用于分页,区间 # ret = session.query(Users)[1:2] # # 排序,根据name降序排列(从大到小) # ret = session.query(Users).order_by(Users.id.desc()).all() # # 第一个条件重复后,再按第二个条件升序排 # ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all() # # 分组 # from sqlalchemy.sql import func # select * from user group by user.extra; # ret = session.query(Users).group_by(Users.extra).all() # # 分组之后取最大id,id之和,最小id # select max(id),sum(id),min(id) from user group by name ; # ret = session.query( # func.max(Users.id), # func.sum(Users.id), # func.min(Users.id)).group_by(Users.name).all() # haviing筛选 # select max(id),sum(id),min(id) from user group by name having min(id)>2; # ret = session.query( # func.max(Users.id), # func.sum(Users.id), # func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) > 2).all() # select max(id),sum(id),min(id) from user where id >=1 group by name having min(id)>2; # ret = session.query( # func.max(Users.id), # func.sum(Users.id), # func.min(Users.id)).filter(Users.id>=1).group_by(Users.name).having(func.min(Users.id) > 2).all() # 连表(默认用forinkey关联) # select * from user,favor where user.id=favor.id # ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all() # join表,默认是inner join # select * from Person inner join favor on person.favor=favor.id; # ret = session.query(Person).join(Favor).all() # isouter=True 外连,表示Person left join Favor,没有右连接,反过来即可 # ret = session.query(Person).join(Favor, isouter=True).all() # ret = session.query(Favor).join(Person, isouter=True).all() # 打印原生sql # aa = session.query(Person).join(Favor, isouter=True) # print(aa) # 自己指定on条件(连表条件),第二个参数,支持on多个条件,用and_,同上 # select * from person left join favor on person.id=favor.id; # ret = session.query(Person).join(Favor, Person.id == Favor.id, isouter=True).all() # 组合(了解)UNION 操作符用于合并两个或多个 SELECT 语句的结果集 # union和union all的区别? # q1 = session.query(Users.name).filter(Users.id > 2) # q2 = session.query(Favor.caption).filter(Favor.nid < 2) # ret = q1.union(q2).all() # q1 = session.query(Users.name).filter(Users.id > 2) # q2 = session.query(Favor.caption).filter(Favor.nid < 2) # ret = q1.union_all(q2).all()
2.6 执行原生sql
# 执行原生sql # 查询 cursor = session.execute('select * from users') result = cursor.fetchall() # 添加 cursor = session.execute('insert into users(name) values(:value)', params={"value": 'lqz'}) session.commit() print(cursor.lastrowid)
3、一对多表操作
3.1 表模型创建
class Hobby(Base): __tablename__ = 'hobby' id = Column(Integer, primary_key=True) caption = Column(String(50), default='篮球') class Person(Base): __tablename__ = 'person' nid = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=True) # hobby指的是tablename而不是类名 hobby_id = Column(Integer, ForeignKey("hobby.id")) # 外键 # 跟数据库无关,不会新增字段,只用于快速链表操作 # 类名,backref用于反向查询 # 正向查询按字段,反向查询按 pers hobby = relationship('Hobby', backref='pers')
3.2 操作表
# 一对多 import datetime from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker from models import Users from sqlalchemy.orm import scoped_session from models import Base # 第三步:迁移,通过表模型,生成表 engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/db01?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) def create_table(): # 通过engine这个连接配置,创建出所有使用Base管理的表 Base.metadata.create_all(engine) def delete_table(): # 通过engine这个连接配置,删除出所有使用Base管理的表 Base.metadata.drop_all(engine) if __name__ == '__main__': # create_table() # delete_table() Session = sessionmaker(bind=engine) session = scoped_session(Session) from models import Hobby, Person # 1 增加数据 # 方式一 session.add_all([ Hobby(caption='乒乓球'), Hobby(caption='羽毛球'), Person(name='张三', hobby_id=1), Person(name='李四', hobby_id=1), ]) session.commit() # 方式二 person = Person(name='张九', hobby=Hobby(caption='姑娘')) session.add(person) # 方式三 hb = Hobby(caption='保龄球') # 反向字段 hb.pers = [Person(name='lqz01'), Person(name='lqz02')] session.add(hb) session.commit() # 2 查询 # 正向查询 person = session.query(Person).first() print(person.name) # 基于对象的跨表查询 print(person.hobby.caption) # 反向查询 v = session.query(Hobby).first() print(v.caption) print(v.pers) # 多条 # 链表查询 # select person.name ,hobby.caption from person left join bobby on person.hobby_id=hobby.id; person_list = session.query(Person.name, Hobby.caption).join(Hobby, isouter=True).all() # person_list = session.query(Person,Hobby).join(Hobby, isouter=True).all() for row in person_list: # print(row.name,row.caption) print(row[0].name, row[1].caption) person_list = session.query(Person).all() for row in person_list: print(row.name, row.hobby.caption) obj = session.query(Hobby).filter(Hobby.id == 1).first() persons = obj.pers print(persons) session.close()
4、多对多表操作
4.1 表模型创建
# boy girl 相亲,一个boy可以约多个女生,一个女生可以相多个男生 class Boy2Girl(Base): __tablename__ = 'boy2girl' id = Column(Integer, primary_key=True, autoincrement=True) girl_id = Column(Integer, ForeignKey('girl.id')) boy_id = Column(Integer, ForeignKey('boy.id')) class Girl(Base): __tablename__ = 'girl' id = Column(Integer, primary_key=True) name = Column(String(64), unique=True, nullable=False) class Boy(Base): __tablename__ = 'boy' id = Column(Integer, primary_key=True, autoincrement=True) name = Column(String(64), unique=True, nullable=False) # 与生成表结构无关,仅用于查询方便,放在哪个单表中都可以 girls = relationship('Girl', secondary='boy2girl', backref='boys')
4.2 操作表
# 多对多 from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import scoped_session from models import Base # 第三步:迁移,通过表模型,生成表 engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/db01?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) def create_table(): # 通过engine这个连接配置,创建出所有使用Base管理的表 Base.metadata.create_all(engine) def delete_table(): # 通过engine这个连接配置,删除出所有使用Base管理的表 Base.metadata.drop_all(engine) from models import Boy, Girl, Boy2Girl if __name__ == '__main__': # create_table() # delete_table() Session = sessionmaker(bind=engine) session = scoped_session(Session) # 1 增加数据 # 方式一 session.add_all([ Boy(name='彭于晏'), Boy(name='刘德华'), Girl(name='刘亦菲'), Girl(name='迪丽热巴'), ]) session.commit() s2g = Boy2Girl(boy_id=1, girl_id=1) session.add(s2g) session.commit() # 方式二 boy = Boy(name='lqz') boy.girls = [Girl(name='小红'), Girl(name='校花')] session.add(boy) session.commit() # 方式三 girl = Girl(name='小梅') girl.boys = [Boy(name='lqz001'), Boy(name='lqz002')] session.add(girl) session.commit() # 基于对象的跨表查 # 使用relationship正向查询 v = session.query(Boy).first() print(v.name) print(v.girls) # 使用relationship反向查询 v = session.query(Girl).first() print(v.name) print(v.boys)
5、flask集成
# Flask_SQLAlchemy 操作数据库 # flask_migrate 模拟django的表迁移 pip3 install flask_migrate # flask_migrate使用步骤 from flask_sqlalchemy import SQLAlchemy db = SQLAlchemy() # 全局SQLAlchemy app = Flask(__name__) app.config.from_object('settings.DevelopmentConfig') # 将db注册到app中,加载配置文件,flask-session,用一个类包裹一下app db.init_app(app) # flask_script创建命令 runserver命令 ,自定义名字 # 下面三句会创建出两个命令:runserver db 命令(flask_migrate) manager=Manager(app) Migrate(app, db) manager.add_command('db',MigrateCommand ) # 添加一个db命令,原来有了runserver命令了 # 直接使用命令迁移表即可 # 1 初始化 python3 manage.py db init # 刚开始干,生成一个migrate文件夹 # 2 创建表,修改表 python3 manage.py db migrate # 等同于 makemigartions python3 manage.py db upgrade # 等同于 migrate
# Flask_SQLAlchemy给你包装了基类,和session,以后拿到db db = SQLAlchemy() # 全局 SQLAlchemy # 增删查改数据-->并发安全 db.session.query() # 表模型要继承基表 class Users(db.Model):