# SQLAlchemy操作数据库 # 依赖 ```txt sqlalchemy flask-sqlalchemy alembic flask-migrate ``` # ORM 使用ORM可以避免SQL注入问题,但是仍然需要对传入的查询参数进行验证。ORM把底层的SQL数据实体转化成高层的Python对象,这样一来,是需要通过Python代码即可完成数据库的操作,ORM实现的三层映射关系: - 表----------类 - 字段(列)---------类属性 - 记录(行)---------类实例 使用原生SQL语句 ```sql CREATE TABLE contacts( name varchar(100) NOT NULL, phone_number verchar(32), ); ``` 使用ORM ```python from foo_orm import Model, Column, String class Contact(Model): __tablename__ = 'contacts' name = Column(String(100), nullable=False) phone_number = Column(String(32)) ``` 使用原生SQL语句 ```sql INSERT INTO contacts(name, phone_number) VALUES('wang', '123456789'); ``` 使用ORM ```python contact = Contact(name='wang', phone_number='123456789') ``` SQLAlchemy是python社区使用最广泛的ORM之一。 # Flask-SQLAlchemy管理数据库 安装 ```bash pip install flask-sqlalchemy ``` 使用 ```python from flask import Flask from flask_sqlalchemy import SQLAlchemy app = Flask(__name__) db = SQLAlchemy(app) ``` ## 定义数据库模型 ```python import os import click from flask import Flask from flask_sqlalchemy import SQLAlchemy app = Flask(__name__) #连接数据库 app.config['SQLALCHEMY_DATABASE_URI'] = os.getenv('DATABASE_URL', 'sqlite:///' + os.path.join(app.root_path, 'note.db')) # 设置是否跟踪数据库的修改情况,一般不跟踪 app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False # 数据库操作时是否显示原始SQL语句,一般都是打开的,因为我们后台要日志 app.config['SQLALCHEMY_ECHO'] = True db = SQLAlchemy(app) class Note(db.Model): id = db.Column(db.Integer, primary_key=True) body = db.Column(db.Text) def __repr__(self): return "Note %r" % self.body @app.cli.command() def initdb(): # 初始化数据库命令 db.create_all() click.echo("Initialized database finish.") ``` 使用命令`flask initdb` 命令创建新的数据库note.db。 ### SQLAlchemy的ORM组件配置 | 配置键 | 说明 | | ------------------------------ | ------------------------------------------------------------ | | SQLALCHEMY_DATABASE_URI | 用于连接数据的数据库。 | | SQLALCHEMY_BINDS | 一个映射绑定 (bind) 键到 SQLAlchemy 连接 URIs 的字典。 | | SQLALCHEMY_ECHO | 如果设置成 True,SQLAlchemy 将会记录所有发到标准输出(stderr)的语句,这对调试很有帮助。 | | SQLALCHEMY_RECORD_QUERIES | 可以用于显示地禁用或者启用查询记录。查询记录在调试或者测试模式下自动启用。 | | SQLALCHEMY_NATIVE_UNICODE | 可以用于显示地禁用支持原生的 unicode。这是某些数据库适配器必须的(像在 Ubuntu 某些版本上的 PostgreSQL),当使用不合适的指定无编码的数据库 默认值时。 | | SQLALCHEMY_POOL_SIZE | 数据库连接池的大小。默认是数据库引擎的默认值 (通常是 5)。 | | SQLALCHEMY_POOL_TIMEOUT | 指定数据库连接池的超时时间。默认是 10。 | | SQLALCHEMY_POOL_RECYCLE | 自动回收连接的秒数。这对 MySQL 是必须的,默认 情况下 MySQL 会自动移除闲置 8 小时或者以上的连接。 需要注意地是如果使用 MySQL 的话, Flask-SQLAlchemy 会自动地设置这个值为 2 小时。 | | SQLALCHEMY_MAX_OVERFLOW | 控制在连接池达到最大值后可以创建的连接数。当这些额外的 连接回收到连接池后将会被断开和抛弃。 | | SQLALCHEMY_TRACK_MODIFICATIONS | 如果设置成 True (默认情况),Flask-SQLAlchemy 将会追踪对象的修改并且发送信号。这需要额外的内存, 如果不必要的可以禁用它。 | ### 连接数据库服务器 要连接数据库服务器,首先需要为我们的程序指定数据库的URI(Uniform Resource Identifier)。数据库URI是一串包含各种属性的字符串,其中包含了各种用于连接数据库的信息。 | DBMS | URI | | -------------- | --------------------------------------------------------- | | PostgreSQL | postgresql://username:password@hostname:port/databasename | | MySQL | mysql://username:password@hostname:port/databasename | | SQLite(file) | sqlite:///path_to_db.db | | SQLite(memory) | sqlite:///:memory: | 在flask-sqlalchemy中,数据可的URI可以通过配置变量`SQLALCHEMY_DATABASE_URI`的值设置,默认为内存型SQLite数据库(sqlite:///:memory:)。 ```python import os app.config['SQLALCHEMY_DATABASE_URI'] = os.getenv('DATABASE_URL', 'sqlite:///'+os.path.join(app.root_path, 'data.db')) ``` 从开发环境换到生产环境,更换到其他类型的DBMS,数据库URL会包含敏感信息,优先从环境变量`DATABASE_URL` 获取。 ### SQLAlchemy常用字段类型 | 字段类型 | python类型 | 说明 | | ------------ | --------------- | ----------------- | | Integer | int | 整型,4字节 | | SmallInteger | int | 短整型,2字节 | | BigInteger | int, long | 长整型 | | Float | float | 浮点数 | | Numeric | decimal.Decimal | 定点数 | | String | str | 字符串 | | Text | str | 变长字符串 | | Unicode | unicode | Unicode字符串 | | UnicodeText | unicode | 变长Unicode字符串 | | Boolean | bool | 布尔值 | | Date | datetime.date | 时间 | ### 常用SQLAlchemy列选项 | 选项 | 说明 | | ----------- | ------------------------------------------------- | | primary_key | 如果为True,代表表的主键 | | unique | 如果为True,代表这列不允许出现重复的值 | | index | 如果为True,为这列创建索引,提高查询效率 | | nullable | 如果为True,允许有空值,如果为False,不允许有空值 | | default | 默认值 | ### 常用SQLAlchemy关系选项 | 选项 | 说明 | | -------------- | ------------------------------------------------------------ | | backref | 在关系的另一模型中添加反向引用 | | primary join | 明确指定两个模型之间使用的联结条件 | | uselist | 如果为False,不使用列表,而使用标量值 | | order_by | 指定关系中记录的排序方式 | | secondary | 指定多对多中记录的排序方式 | | secondary join | 在SQLAlchemy中无法自行决定时,指定多对多关系中的二级联结条件 | ## 数据库操作CRUD ### 增(Create) 三步添加一条记录到数据库 1. 创建python对象,可通过实例化模型类,作为一条记录 2. 使用db.session.add()方法添加记录到数据库会话 3. 使用db.session.commit()提交数据库会话 ```python note1 = Note(body="wangyuedong111") note2 = Note(body="wangyuedong222") note3 = Note(body="wangyuedong333") db.session.add(note1) db.session.add(note2) db.session.add(note3) db.session.commit() #也可以使用add_all()添加一个列表对象 note_list = [note1, note2, note3] db.session.add_all(note_list) db.session.commit() ``` ### 查(Read) 一个完整的查询:`<模型类>.query.<过滤方法>.<查询方法>` 。 | 查询方法 | 说明 | | -------------- | ------------------------------------------------------------ | | all() | 返回包含所有查询记录的列表 | | first() | 返回查询的第一条记录,如果未找到,返回None | | one() | 返回查询的第一条记录,如果未找到,返回None | | get(id) | 传入主键值作为参数,返回指定主键值的记录,如果未找到,返回None | | count() | 返回查询结果的数量 | | one_or_none() | one() | | first_or_404() | | | get_or_404() | | | paginate() | 返回一个Paginate对象,可以对记录进行分页处理 | | 过滤方法 | 说明 | | -------------- | ------------------------------------------------------------ | | filter() | 使用指定的规则过滤记录,返回新的查询对象 | | filter_by() | 使用指定的规则过滤记录,可以使用关键字表达式,返回新的查询对象 | | order_by() | | | limit(limit) | | | group_by() | | | offset(offset) | | ```python # filter方法传入参数时可使用like,in,not in,and,or,!, == filter(Note.body.like("%foo%")) filter(Note.body.in_(["foo", "bar", "baz"])) filter(~Note.body.in_(["foo", "bar", "baz"])) from sqlalchemy import and_ filter(and_(Note.body == "foo", Note.title == "bar")) #也可以使用多个表达式逗号分开 filter(Note.body == "foo", Note.title == "bar") #也可以多个叠加 filter(Note.body == "foo").filter(Note.title == "bar") from sqlalchemy import or_ filter(or_(Note.body == "foo", Note.title == "bar")) #filter_by Note.query.filter_by(body="wangyuedong").all() #查询所有数据 results = session.query(User).all() #使用limit限制查询的数量,如只查询表中前10条数据 results = session.query(User).limit(10).all() #使用offset可以设置查询的偏移起始值,如要获取88到95行的数据 results = session.query(User).limit(8)offset(88).all() #使用slice也可以设置切片的起始值 resluts = session.query(User).slice(90, 100).all() ``` ### 改(Update) 更新一条记录只要两步 1. 直接赋值给模型类的字段属性改变字段值 2. 调用commit()提交 ```python note = Note.query.get(2) note.body = "wangyuedong___wudi" db.session.commit() ``` ### 删(Delete) 删除记录 1. 查询到要删除的对象 2. 使用db.session.delete() 3. 使用db.session.commit() ```python note = Note.query.get(2) db.session.delete(note) db.session.commit() ``` ### 排序 1. order_by:可以指定根据这个表中的某个字段进行排序,如果在前面加了一个 `-` ,代表的是降序排序。 2. 在模型定义的时候指定默认排序:有些时候,不想每次在查询的时候都指定排序的方式,可以在定义模型的时候就指定排序的方式。有以下两种方式: - relationship的order_by参数:在指定relationship的时候,传递order_by参数来指定排序的字段。 - 在模型定义中,添加代码 `__mapper_args__ = {"order_by": title}` (新版本SQLAlchemy被移除) 3. 正序排序与倒序排序:默认是使用正序排序。如果需要使用倒序排序,那么可以使用这个字段的`desc()`方法,或者是在排序的时候使用这个字段的字符串名字,然后在前面加一个负号。 ## 示例代码 ```python from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.ext.declarative import declarative_base from config import DB_URI engine = create_engine(DB_URI) Base = declarative_base(engine) session = sessionmaker(engine)() class Student(Base): __tablename__ = 'student' # 表名 id = Column(Integer, primary_key=True, autoincrement=True) name = Column(String(50)) age = Column(Integer) sex = Column(String(10)) Base.metadata.create_all() # 将模型映射到数据库中 #Create student = Student(name='Tony', age=18, sex='male') # 创建一个student对象 session.add(student) # 添加到session session.commit() # 提交到数据库 #Read item_list = session.query(Student).all() print(item_list) for item in item_list: print(item.name, item.age) item_list = session.query(Student.name).all() print(item_list) # [('Tony',), ('Jane',), ('Ben',)] item = session.query(Student.name).first() print(item) # ('Tony',) item_list = session.query(Student.name).filter(Student.age >= 18).all() print(item_list) # [('Tony',), ('Ben',)] item_list = session.query(Student.name, Student.age).order_by(Student.age.desc()).all() # desc()表示倒序 print(item_list) # [('Ben', 20), ('Tony', 18), ('Jane', 16)] # 默认为and, 在filter()中用,分隔多个条件表示and item_list = session.query(Student.name, Student.age, Student.sex).filter( Student.age >= 10, Student.sex == 'female' ).all() print(item_list) # [('Jane', 16, 'female')] from sqlalchemy import or_ # 使用or_连接多个条件 item_list = session.query(Student.name, Student.age, Student.sex).filter( or_(Student.age >= 20, Student.sex == 'female') ).all() print(item_list) # [('Jane', 16, 'female'), ('Ben', 20, 'male')] count = session.query(Student).count() print(count) # 3 #Update # 修改Tony的age为22 session.query(Student).filter(Student.name == 'Tony').update({'age': 22}) session.commit() item = session.query(Student.name, Student.age).filter(Student.name == 'Tony').first() print(item) #Delete # 删除名称为Ben的数据 session.query(Student).filter(Student.name == 'Ben').delete() session.commit() item_list = session.query(Student.name, Student.age).all() print(item_list) ``` ## 建立表间关系 ### 一对多关系 如一个作者对应多篇文章的关系。 1. 在多侧(Article表)使用`db.ForeignKey('author.id')`定义外键,需要建立新的字段; 2. 在一侧(Author表)使用`db.relationship('Article')`定义关系属性,不会建立新的字段; 3. 建立关系,一种方式为外键赋值,另一种方式操作关系属性:一的对象的关系属性append多的对象,通过remove方法也可以和关系对象解除关系。 4. 建立双向关系,并不必须。建立双向关系后只需要在一侧操作关系对象,另一侧关系对象会自动生成。 常用的关系函数参数表 | 参数名 | 说明 | | -------------- | ------------------------------------------ | | back_populates | 为反向关系所对应的属性进行命名。 | | backref | 自动为关系另一侧添加关系属性,简化双向绑定 | | lazy | 指定如何加载关系记录 | | userlist | True返回全部记录,False返回单个记录 | | secondary | 多对多的关联表 | | cascade | 级联 | [详细解释](https://docs.sqlalchemy.org/en/14/orm/basic_relationships.html#relationship-patterns) | lazy参数值 | 说明 | | ---------- | ------------------ | | select | 一次性加载,默认值 | | joined | | | inher | | | subquery | | | dynamic | | ```python # 使用back_populates属性双向绑定 class Writer(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(70), unique=True) books = db.relationship('Book', back_populates='writer') class Book(db.Model): id = db.Column(db.Integer, primary_key=True) title = db.Column(db.String(50), index=True) writer_id = db.Column(db.Integer, db.ForeignKey('writer.id')) writer = db.relationship('Writer', back_populates='books') # 使用backref属性双向绑定 class Singer(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(70), unique=True) songs = db.relationship('Song', backref='singer') class Song(db.Model): id = db.Column(db.Integer, primary_key=True) song_name = db.Column(db.String(50), index=True) singer_id = db.Column(db.Integer, db.ForeignKey('singer.id')) # 显式好于隐式,最好双向显式表达 ``` ### 多对一关系 ```python class Citizen(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(70), unique=True) city_id = db.Column(db.Integer, db.ForeignKey('city.id')) city = db.relationship('City') class City(db.Model): id = db.Column(db.Integer, primary_key=True) city_name = db.Column(db.String(50), index=True) ``` 从一定角度看,一对多的关系和多对一的关系是一样的。 ### 一对一关系 确保关系两侧的关系属性都是标量属性。在示例程序中,Country类表示国家,Capital类表示首都。建立一对一关系后,我们将在Country类中创建一个标量关系属性capital,调用它会获取单个Capital对象;我们还将在Capital类中创建一个标量关系属性country,调用它会获取单个的Country对象。一对一关系实际上是通过建立双向关系的一对多关系的基础上转化而来。我们要确保关系两侧的关系属性都是标量属性,都只返回单个值,所以要在定义集合属性的关系函数中将uselist参数设为False,这时一对多关系将被转换为一对一关系。 ```python class Country(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(30), unique=True) capital = db.relationship('Capital', uselist=False) class Capital(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(50), index=True) country_id = db.Column(db.Integer, db.ForeignKey('country.id')) country = db.relationship('Country') ``` ### 多对多关系 多对多需要创建一个关联表(association table),关联表不存储数据,只存储两侧外键的对应关系。通过关联表将多对多关系变成两个一对多关系。 ```python association_table = db.Table('association', db.Column('student_id', db.Integer, db.ForeignKey('student.id')), db.Column('teacher_id', db.Integer, db.ForeignKey('teacher.id'))) class Student(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(70), unique=True) grade = db.Column(db.String(70)) teachers = db.relationship('Teacher', secondary=association_table, back_populates='students') class Teacher(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(70), unique=True) office = db.Column(db.String(20)) students = db.relationship('Student', secondary=association_table, back_populates='teachers') ``` ## 更新数据库 ### 重新生成表 ```python # 删除表连同表的所有字段和所有数据 db.drop_all() db.create_all() ``` ### 使用flask-migrate迁移数据库 ```python pip install flask-migrate ``` 使用 ```python from flask import Flask from flask_sqlalchemy import SQLAlchemy from flase_migrate import Migrate app = Flask(__name__) db = SQLAlchemy(app) migrate = Migrate(app, db) ``` 使用命令 ```bash #创建迁移环境 flask db init #生成迁移脚本 flask db migrate -m "add note timestamp" #更新数据库 flask db upgrade ``` ## 事件监听 ```python @db.even.listens_for(Draft.body, 'set') def increment_edit_time(target, value, oldvalue, initator): if target.edit_time is not None: target.edit_time += 1 @db.even.listens_for(Draft.body, 'set', named=True) def increment_edit_time(**kwargs): if kwargs['target'].edit_time is not None: target.edit_time += 1 ```