Python微信订餐小程序课程视频

https://edu.csdn.net/course/detail/36074

Python实战量化交易理财系统

https://edu.csdn.net/course/detail/35475

最近想要学习SQLAlchemy, 发现网上的中文文档大多是机翻的, 读起来特别变扭, 因此对照着最新的英文文档梳理了一遍, 写下来记录一下
目前SQLAlchemy的版本为1.4.x, 风格处于1.x过渡到2.0的时代. 为了尽量让这篇文章的兼容之后的版本, 本文将讲述1.x和2.0两种风格的接口(主要是查询的接口)

其实在2.0风格中, 主要受到影响的是ORM的查询方式, 详情见文档: 2.0 Migration - ORM Usage

安装

[fast →](https://blog.csdn.net/biggbang)

检测sqlalchemy版本:

[fast →](https://blog.csdn.net/biggbang)

使用步骤

一般来说SQLAlchemy的使用方式有两种: CoreORM
两种有什么不同呢?

  1. ORM是构建在Core之上的
  2. Core更加底层, 可以执行直接执行SQL语句
  3. ORM类似于Django的ORM, 由于sqlalchemy提供了一套接口, 所以不需要我们直接写SQL语句 (1.x版本)
  4. 至于要用哪个, 等到你用到时, 你会知道的

组件依赖关系图:

Core

一般来说, 使用步骤如下:

  1. 配置数据库连接
  2. 建立连接
  3. 创建表
  4. 执行SQL语句, 按需开启事件是否自动提交
  5. 拿到返回数据, 执行其他代码

数据库的连接的格式

我们在创建引擎(连接)时, 需要指定数据库的URL, URL格式, 见: Engine Configuration, 总的来说, 格式就是: dialect[+driver]://user:password@host/dbname[?key=value..]

  • dialect 数据库名称(方言): 如mysql
  • driver 连接数据库的库: 如: pymysql
  • user 用户名
  • password 密码
  • host 地址
  • dbname 数据库名称
  • key=value 指的是给数据库的参数

如下面的URL:

|  | mysql+pymysql://root:passwd@127.0.0.1:3306/test\_db?charset=utf8 |

建立连接

调用sqlalchemy.create_engine方法, 为了兼容2.0风格的接口, 可以加上future参数. 至于什么是2.0风格的接口, 可以看看官方文档: 2.0 style
create_engine有几个参数需要我们注意:

  • url 即数据库url, 其格式见上文: 数据库的连接的格式
  • echo参数为True时, 将会将engine的SQL记录到日志中 ( 默认输出到标准输出)
  • echo_poolTrue时,会将连接池的记录信息输出
  • future 使用2.0样式EngineConnection API

更多参数见官方文档: sqlalchemy.create_engine

例子

|  | from sqlalchemy import create\_engine |
|  |  |
|  | # 兼容2.0的写法 |
|  | # 返回对象不一样 |
|  | engine1 = create\_engine("sqlite+pysqlite:///:memory:", echo=True, future=True) |
|  | print(type(engine1)) |
|  | #  |
|  |  |
|  | engine2 = create\_engine("sqlite+pysqlite:///:memory:", echo=True) |
|  | print(type(engine2)) |
|  | #  |

注意, 由于sqlalchemy使用lazy initialization的策略连接数据库, 故此时还未真正地连接上数据库

创建表

我们想要让数据库创建一个表, 需要利用MetaData对象, 关于一些常用的MetaData方法, 见: MetaData
除了要MetaData对象外, 我们还需要Table对象, 用于定义一个表的结构
Table的一般使用

|  | mytable = Table("mytable", metadata, |
|  |  Column('mytable\_id', Integer, primary\_key=True), |
|  |  Column('value', String(50)) |
|  |  ) |

Table的参数:

  • name 表名称
  • metadata 该表所属的MetaData对象
  • 其他参数: 通过Column指定一列数据, 格式见: Column定义

例子:

|  | from sqlalchemy import MetaData, Table, Column, Integer, String, ForeignKey |
|  | from sqlalchemy import create\_engine, text |
|  |  |
|  | # 数据库配置 |
|  | DATABASE\_CONFIG = { |
|  | "username": "root", |
|  | "password": "123456", |
|  | "host": "localhost", |
|  | "database": "test" |
|  | } |
|  | # 连接mysql |
|  | engine = create\_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE\_CONFIG), |
|  |  echo=True, future=True) |
|  | metadata\_obj = MetaData() |
|  |  |
|  | user\_table = Table( |
|  | "user\_account", |
|  |  metadata\_obj, |
|  |  Column('id', Integer, primary\_key=True), |
|  |  Column("username", String(30))) # String也可以不实例化 |
|  |  |
|  | # 第二个表 |
|  | address\_table = Table( |
|  | "address", |
|  |  metadata\_obj, |
|  |  Column("id", Integer, primary\_key=True), |
|  | # 定义外键 |
|  |  Column("uid", ForeignKey("user\_account.id"), nullable=False), |
|  |  Column('email\_address', String(32), nullable=False) |
|  | ) |
|  | # 相当于执行 CREATE TABLE 语句 |
|  | metadata\_obj.create\_all(engine) |
|  |  |
|  | """ |
|  | -- 相当于: |
|  | CREATE TABLE user\_account ( |
|  |  id INTEGER NOT NULL AUTO\_INCREMENT,  |
|  |  username VARCHAR(30),  |
|  |  PRIMARY KEY (id) |
|  | ); |
|  | CREATE TABLE address ( |
|  |  id INTEGER NOT NULL AUTO\_INCREMENT,  |
|  |  uid INTEGER NOT NULL,  |
|  |  email\_address VARCHAR(32) NOT NULL,  |
|  |  PRIMARY KEY (id),  |
|  |  FOREIGN KEY(uid) REFERENCES user\_account (id) |
|  | ) |
|  | """ |

create_all方法, 默认会在创建表之间检测一下表是否存在, 不存在时才创建.

Table的一些属性

|  | # ---------- 访问所有列 |
|  | # .c => Column |
|  | print(user\_table.c.keys()) |
|  | # ['id', 'username'] |
|  |  |
|  | # ---------- 访问某一列 |
|  | print(repr(user\_table.c.username)) |
|  | # Column('username', String(length=30), table=) |
|  |  |
|  | # ---------- 返回主键 |
|  | print(user\_table.primary\_key) |
|  | # 隐式生成 |
|  | # PrimaryKeyConstraint(Column('id', Integer(), table=, primary\_key=True, nullable=False)) |

在事务中执行SQL

通常, 我们通过调用engine.connectengine.begin方法开始一个事件
sqlalchemy使用事务有两种风格commit as you goBegin once, 前者需要我们手动提交, 后者会自动提交

手动提交

engine.connect方法符合python的上下文管理协议, 会返回一个Connection对象, 该方法会在不手动提交的情况下回滚.举个例子:

|  | from sqlalchemy import create\_engine |
|  | from sqlalchemy import text |
|  |  |
|  | # 数据库配置 |
|  | DATABASE\_CONFIG = { |
|  | "username": "root", |
|  | "password": "123456", |
|  | "host": "localhost", |
|  | "database": "test" |
|  | } |
|  | # 连接mysql |
|  | engine = create\_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE\_CONFIG), |
|  |  echo=True, future=True) |
|  |  |
|  | with engine.connect() as conn: |
|  | # 执行 |
|  |  result = conn.execute(text("select 'hello world'")) # text 可以使用SQL语句 |
|  |  print(result.all()) |
|  | # conn.commit() |
|  | # [('hello world',)] |
|  |  |
|  | # 最后会ROLLBACK |

上面的代码中, 相当于开启了事务, 由于最后没有调用commit方法, 所以会回滚.

自动提交

engine.begin方法也符合python的上下文管理协议, 只要执行时不报错就会自动提交, 报错时会回滚.

|  | from sqlalchemy import create\_engine |
|  | from sqlalchemy import text |
|  |  |
|  | # 数据库配置 |
|  | DATABASE\_CONFIG = { |
|  | "username": "root", |
|  | "password": "123456", |
|  | "host": "localhost", |
|  | "database": "test" |
|  | } |
|  | # 连接mysql |
|  | engine = create\_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE\_CONFIG), |
|  |  echo=True, future=True) |
|  |  |
|  | with engine.begin() as conn: |
|  |  result = conn.execute(text("select 'hello world'")) |
|  |  print(result.all()) |
|  | # [('hello world',)] |
|  |  |
|  | # COMMIT |

绑定参数

上面在事务中执行SQL语句时, 我们用到了sqlalchemy.text, 可以直接定义文本SQL字符串
为了避免被SQL注入, 故在需要传入参数的场景中需要根据sqlalchemy的方式传入, 而不是直接拼接成字符串.
使用:y的格式定义参数, 且将值以字典的形式传给execute

|  | from sqlalchemy import create\_engine |
|  | from sqlalchemy import text |
|  |  |
|  | # 数据库配置 |
|  | DATABASE\_CONFIG = { |
|  | "username": "root", |
|  | "password": "123456", |
|  | "host": "localhost", |
|  | "database": "test" |
|  | } |
|  | # 连接mysql |
|  | engine = create\_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE\_CONFIG), |
|  |  echo=True, future=True) |
|  |  |
|  | with engine.begin() as conn: |
|  |  result = conn.execute(text("select name from userinfo where name like :y"), {"y": "lcz%"}) |
|  |  print(result.all()) |
|  | # [('lczmx',)] |
|  |  |
|  | # COMMIT |
|  |  |

多个参数时, 可以这样

|  | with engine.connect() as conn: |
|  |  conn.execute( |
|  |  text("INSERT INTO userinfo (id, name) VALUES (:x, :y)"), |
|  |  [{"x": 1, "y": "lcmx"}, {"x": 2, "y": "xxx"}]) |
|  |  conn.commit() |

这种方式也可以

|  | stmt = text("SELECT x, y FROM some\_table WHERE y > :y ORDER BY x, y").bindparams(y=6) |
|  |  |
|  | with engine.connect() as conn: |
|  |  conn.execute(stmt) |
|  |  conn.commit() |

增删改查

处理使用text直接执行SQL外, 你还可以使用其他语法增删改查数据
假如表结构如下:

`|  |  |
|  |
|  |
|  |
|  |
|  |
|  |
|  |
|  |
|  |
|  |
|  |
|  |
|  |  |
|  |
|  | $ |
|  |
|  |
|  |
|  |
|  |
|  |
|  |
|  |
|  |
|  |
|  |
|  |`[fast →](https://blog.csdn.net/biggbang)show create table address; +---------+-----------------------------------------+ | | Table | Create Table | | +---------+-----------------------------------------+ | | address | CREATE TABLE `address` ( |  `id` int NOT NULL AUTO\_INCREMENT, |  `uid` int NOT NULL, |  `email\_address` varchar(32) NOT NULL, |  PRIMARY KEY (`id`), |  KEY `uid` (`uid`), |  CONSTRAINT `address\_ibfk\_1` FOREIGN KEY (`uid`) REFERENCES `user\_account` (`id`) | ) ENGINE=InnoDB AUTO\_INCREMENT=2 DEFAULT CHARSET=gbk | | +---------+------------------------------------------+ |  | +--------------+------------------------------------+ | | Table | Create Table | | +--------------+------------------------------------+ | | user\_account | CREATE TABLE `user\_account` ( |  `id` int NOT NULL AUTO\_INCREMENT, |  `username` varchar(30) DEFAULT NULL, |  PRIMARY KEY (`id`) | ) ENGINE=InnoDB AUTO\_INCREMENT=6 DEFAULT CHARSET=gbk | | +--------------+-------------------------------------+ | 1 row in set (0.00 sec) |  |  |[restart ↻](https://blog.csdn.net/biggbang)

插入数据

使用insert(...).values(...)形式为数据库插入数据

|  | from sqlalchemy import MetaData, Table, Column, Integer, String, ForeignKey |
|  | from sqlalchemy import create\_engine, insert |
|  |  |
|  | # 数据库配置 |
|  | DATABASE\_CONFIG = { |
|  | "username": "root", |
|  | "password": "123456", |
|  | "host": "localhost", |
|  | "database": "test" |
|  | } |
|  | # 连接mysql |
|  | engine = create\_engine("mysql+pymysql://{username}:{password}@{host}/{database}?charset=utf8".format(**DATABASE\_CONFIG), |
|  |  echo=True, future=True) |
|  | metadata\_obj = MetaData() |
|  |  |
|  | user\_table = Table( |
|  | "user\_account", |
|  |  metadata\_obj, |
|  |  Column('id', Integer, primary\_key=True), |
|  |  Column("username", String(30))) # String也可以不实例化 |
|  |  |
|  | # 第二个表 |
|  | address\_table = Table( |
|  | "address", |
|  |  metadata\_obj, |
|  |  Column("id", Integer, primary\_key=True), |
|  |  Column("uid", ForeignKey("user\_account.id"), nullable=False), |
|  |  Column('email\_address', String(32), nullable=False) |
|  | ) |
|  |  |
|  | metadata\_obj.create\_all(bind=engine) |
|  |  |
|  | with engine.connect() as conn: |
|  | # 插入一条普通数据 |
|  |  conn.execute(insert(user\_table).values(id=1, username="lczmx")) |
|  | # 插入外键等数据 |
|  |  conn.execute(insert(address\_table).values(uid=1, email\_address="lczmx@foxmail.com")) |
|  |  |
|  | # 自动生成value, 不需要我们手动指定 |
|  |  |
|  |  conn.execute(insert(user\_table), |
|  |  [{"username": "张三"}, |
|  |  {"username": "李四"}, |
|  |  {"username": "王五"}, |
|  |  {"username": "赵六"}, |
|  |  ]) |
|  |  |
|  |  conn.commit() |
|  |  |

SQLAlchemy还提供了更复杂的用法, 见: Inserting Rows with Core

注意: 插入数据没有返回值

删除数据

使用delete(...).where(...)的形式删除数据

目前的表数据:

|  | select u.id as uid, u.username, a.id as aid, a.email\_address as email\_address  |
|  | from user\_account as u  |
|  | left join address as a on u.id=a.uid; |
|  | +-----+----------+------+-------------------+ |
|  | | uid | username | aid | email\_address | |
|  | +-----+----------+------+-------------------+ |
|  | | 1 | lczmx | 1 | lczmx@foxmail.com | |
|  | | 2 | 张三 | NULL | NULL | |
|  | | 3 | 李四 | NULL | NULL | |
|  | | 4 | 王五 | NULL | NULL | |
|  | | 5 | 赵六 | NULL | NULL | |
|  | +-----+----------+------+-------------------+ |
|  |  |

例子:

|  | from sqlalchemy import MetaData, Table, Column, Integer, String, ForeignKey |
|  | from sqlalchemy import create\_engine, delete |
|  |  |
|  | # 数据库配置 |
|  | DATABASE\_CONFIG = { |
|  | "username": "root", |
|  | "password": "123456", |
|  | "host": "localhost", |
|  | "database": "test" |
|  | } |
|  | # 连接mysql |
|  | engine = create\_engine("mysql+pymysql://{username}:{password}@{host}/{database}?charset=utf8".format(**DATABASE\_CONFIG), |
|  |  echo=True, future=True) |
|  | metadata\_obj = MetaData() |
|  |  |
|  | user\_table = Table( |
|  | "user\_account", |
|  |  metadata\_obj, |
|  |  Column('id', Integer, primary\_key=True), |
|  |  Column("username", String(30))) # String也可以不实例化 |
|  |  |
|  | # 第二个表 |
|  | address\_table = Table( |
|  | "address", |
|  |  metadata\_obj, |
|  |  Column("id", Integer, primary\_key=True), |
|  |  Column("uid", ForeignKey("user\_account.id"), nullable=False), |
|  |  Column('email\_address', String(32), nullable=False) |
|  | ) |
|  |  |
|  | metadata\_obj.create\_all(bind=engine) |
|  |  |
|  | with engine.connect() as conn: |
|  | # 一般删除 |
|  | # user\_table.c 获取的是 列数据 |
|  |  result1 = conn.execute(delete(user\_table).where(user\_table.c.id == 3)) |
|  |  print(f"受影响行数: {result1.rowcount}") # 受影响行数: 1 |
|  |  |
|  | # and 删除 |
|  |  result2 = conn.execute(delete(user\_table).where(user\_table.c.username == "张三", user\_table.c.id == 2)) |
|  |  print(f"受影响行数: {result2.rowcount}") # 受影响行数: 1 |
|  |  |
|  |  conn.commit() |
|  |  |

.rowcount属性获取受影响的行数

更多见: The delete() SQL Expression Construct

更新数据

使用update(...).where(...).values(...)的形式更新数据

|  | select u.id as uid, u.username, a.id as aid, a.email\_address as email\_address  |
|  | from user\_account as u  |
|  | left join address as a on u.id=a.uid; |
|  |  |
|  | +-----+----------+------+-------------------+ |
|  | | uid | username | aid | email\_address | |
|  | +-----+----------+------+-------------------+ |
|  | | 1 | lczmx | 1 | lczmx@foxmail.com | |
|  | | 2 | 张三 | NULL | NULL | |
|  | | 3 | 李四 | NULL | NULL | |
|  | | 4 | 王五 | NULL | NULL | |
|  | | 5 | 赵六 | NULL | NULL | |
|  | +-----+----------+------+-------------------+ |
|  |  |

例子:

|  | from sqlalchemy import MetaData, Table, Column, Integer, String, ForeignKey |
|  | from sqlalchemy import create\_engine, update, bindparam, select |
|  |  |
|  | # 数据库配置 |
|  | DATABASE\_CONFIG = { |
|  | "username": "root", |
|  | "password": "123456", |
|  | "host": "localhost", |
|  | "database": "test" |
|  | } |
|  | # 连接mysql |
|  | engine = create\_engine("mysql+pymysql://{username}:{password}@{host}/{database}?charset=utf8". |
|  | format(**DATABASE\_CONFIG), echo=True, future=True) |
|  | metadata\_obj = MetaData() |
|  |  |
|  | user\_table = Table( |
|  | "user\_account", |
|  |  metadata\_obj, |
|  |  Column('id', Integer, primary\_key=True), |
|  |  Column("username", String(30))) # String也可以不实例化 |
|  |  |
|  | # 第二个表 |
|  | address\_table = Table( |
|  | "address", |
|  |  metadata\_obj, |
|  |  Column("id", Integer, primary\_key=True), |
|  |  Column("uid", ForeignKey("user\_account.id"), nullable=False), |
|  |  Column('email\_address', String(32), nullable=False) |
|  | ) |
|  |  |
|  | metadata\_obj.create\_all(bind=engine) |
|  |  |
|  | with engine.connect() as conn: |
|  | # 一般更新 |
|  |  result1 = conn.execute(update(user\_table).where( |
|  |  user\_table.c.username == "王五").values(username="王老五")) |
|  |  print(f"受影响行数: {result1.rowcount}") # 受影响行数: 1 |
|  |  |
|  | # 更新数据 加上 原来的数据 |
|  |  result2 = conn.execute( |
|  |  update(user\_table).where(user\_table.c.username == "赵六").values( |
|  |  username=user\_table.c.username + "一号")) |
|  |  print(f"受影响行数: {result2.rowcount}") # 受影响行数: 1 |
|  |  |
|  | # 以字典的形式, 替换更新多个值 |
|  |  result3 = conn.execute( |
|  |  update(user\_table).where(user\_table.c.username == bindparam('old\_name')).values( |
|  |  username=bindparam('new\_name')), |
|  |  [ |
|  |  {"old\_name": "张三", "new\_name": "新张三"}, |
|  |  {"old\_name": "李四", "new\_name": "新李四"}, |
|  |  ] |
|  |  ) |
|  |  |
|  |  print(f"受影响行数: {result3.rowcount}") # 受影响行数: 2 |
|  |  |
|  | # 以 子查询 的方式 更新数据 |
|  |  scalar\_subq = ( |
|  |  select(address\_table.c.email\_address). |
|  |  where(address\_table.c.uid == user\_table.c.id). |
|  |  order\_by(address\_table.c.id). |
|  |  limit(1). |
|  |  scalar\_subquery() |
|  |  ) |
|  | # 将email\_address的值 赋给 username |
|  |  update(user\_table).values(username=scalar\_subq) |
|  |  |
|  | """ |
|  |  -- 以上查询, 相当于: |
|  |  UPDATE user\_account SET username=(SELECT address.email\_address |
|  |  FROM address |
|  |  WHERE address.uid = user\_account.id ORDER BY address.id |
|  |  LIMIT :param\_1) |
|  |  """ |
|  |  conn.commit() |
|  |  |

修改后的结果:

|  | +-----+----------+------+-------------------+ |
|  | | uid | username | aid | email\_address | |
|  | +-----+----------+------+-------------------+ |
|  | | 1 | lczmx | 1 | lczmx@foxmail.com | |
|  | | 2 | 新张三 | NULL | NULL | |
|  | | 3 | 新李四 | NULL | NULL | |
|  | | 4 | 王老五 | NULL | NULL | |
|  | | 5 | 赵六一号 | NULL | NULL | |
|  | +-----+----------+------+-------------------+ |
|  |  |

更多见: Updating and Deleting Rows with Core

查询数据

由于2.0的查询方式, Core和ORM都可以使用, 所以放在一起, 见下文: 查询数据详解

处理查询返回的数据

我们执行conn.execute方法的结果为: CursorResult对象
其本质上是继承与Result对象, 其使用方式见: Result

例子:
假如查询的表:

|  | mysql> select * from user\_account; |
|  | +----+----------+ |
|  | | id | username | |
|  | +----+----------+ |
|  | | 9 | lczmx | |
|  | | 10 | jack | |
|  | | 11 | tom | |
|  | | 12 | mike | |
|  | +----+----------+ |
|  | 4 rows in set (0.00 sec) |
|  |  |
|  | mysql> |
|  |  |

利用SQLAlchemy获取数据:

|  | from sqlalchemy import MetaData, Table, Column, Integer, String, ForeignKey |
|  | from sqlalchemy import create\_engine, text |
|  |  |
|  | # 数据库配置 |
|  | DATABASE\_CONFIG = { |
|  | "username": "root", |
|  | "password": "123456", |
|  | "host": "localhost", |
|  | "database": "test" |
|  | } |
|  | # 连接mysql |
|  | engine = create\_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE\_CONFIG), |
|  |  echo=True, future=True) |
|  |  |
|  | with engine.connect() as conn: |
|  | # 执行 |
|  |  result = conn.execute(text("select * from user\_account;")) |
|  |  |
|  | for row in result.all(): |
|  | # 使用f-strings 格式化字符串 |
|  |  print(f"id: {row.id:3}, username: {row.username:20}") |
|  | # 打印的结果: |
|  | """ |
|  |  id: 9, username: lczmx  |
|  |  id: 10, username: jack  |
|  |  id: 11, username: tom  |
|  |  id: 12, username: mike  |
|  |  """ |
|  |  conn.commit() |
|  |  |

ORM

和Core一样, ORM也有一定的使用步骤:

  1. 配置数据库连接, 见上文: 数据库的连接的格式
  2. 创建会话
  3. 创建表
  4. 使用接口, 增删改查数据
  5. 拿到返回数据, 执行其他代码

在学习SQLAlcehmy的ORM之前, 建议先了解一些概念, 以免后面会混淆

  1. 会话 Session
    会话是SQLAlchemy ORM与数据库的交互对象
    它可以管理建立连接中engine, 并为通过会话加载或与会话关联的对象提供标识映射 (identity map)
    在使用时与Connection非常相似, 你可以对比着使用
  2. Base
    通过sqlalchemy.orm.declarative_base创建
    作为定义表的基类, 内部有包含MetaData对象
    可以类似于Django一样定义表

SQLAlchemy中, session是一个连接池, 的由其管理, 因此, 假如我们需要操作数据库的话, 需要在session中拿到Connection(连接)

创建会话

SQLAlchemy提供了两种创建会话的方法:

  1. sqlalchemy.orm.Session
|  | from sqlalchemy import create\_engine |
|  | from sqlalchemy.orm import Session |
|  |  |
|  | # 创建引擎 |
|  | engine = create\_engine('postgresql://scott:tiger@localhost/') |
|  |  |
|  | # 创建会话 |
|  | # 以下with可以简写成 with Session(engine) as session, session.begin(): |
|  | with Session(engine) as session: |
|  | # 开启自动提交 |
|  | with session.begin(): |
|  | # add方法 会将some\_object 保存到数据库 |
|  | # session.add(some\_object) |
|  | # session.add(some\_other\_object) |
|  | pass |
|  |  |
  1. sqlalchemy.orm.sessionmaker
|  | from sqlalchemy import create\_engine |
|  | from sqlalchemy.orm import sessionmaker |
|  |  |
|  | # 创建引擎 |
|  | engine = create\_engine('postgresql://scott:tiger@localhost/') |
|  |  |
|  | # 创建session |
|  | Session = sessionmaker(engine) |
|  |  |
|  | # 一般使用 |
|  | with Session() as session: |
|  | # session.add(some\_object) |
|  | # session.add(some\_other\_object) |
|  | # 提交 |
|  |  session.commit() |
|  |  |
|  | # 自动提交 |
|  | with Session.begin() as session: |
|  | # session.add(some\_object) |
|  | # session.add(some\_other\_object) |
|  | pass |
|  |  |

虽然有两种方法创建会话, 但我们一般使用sessionmaker创建会话

另外补充一下session的其它使用方式:

|  | from sqlalchemy import create\_engine |
|  | from sqlalchemy.orm import sessionmaker |
|  |  |
|  | engine = create\_engine('postgresql://scott:tiger@localhost/') |
|  |  |
|  | Session = sessionmaker(engine) |
|  |  |
|  | # 从连接指定到session |
|  | with engine.connect() as connection: |
|  | with Session(bind=connection) as session: |
|  | # 一些操作 |
|  | pass |

下面列出session的一些常用方法, 增删改查数据时要用到

方法 参数 描述
add instance 下次刷新操作时, 将 instance 保留到数据库中
delete instance 下次刷新操作时, 将instance从数据库中删除
begin subtransactions nested _subtrans 开始事务
rollback 回滚当前事务
commit 提交当前事务
close 关闭此Session
execute statement params execution_option bind_arguments 执行SQL表达式构造
query *entities **kwargs 返回Query对象, 可用于查询数据
refresh instance attribute_names with_for_update instance执行刷新操作

例子:

|  | from sqlalchemy import create\_engine, text |
|  | from sqlalchemy.orm import Session |
|  |  |
|  | # 数据库配置 |
|  | DATABASE\_CONFIG = { |
|  | "username": "root", |
|  | "password": "123456", |
|  | "host": "localhost", |
|  | "database": "test" |
|  | } |
|  | # 连接mysql |
|  | engine = create\_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE\_CONFIG), |
|  |  echo=True, future=True) |
|  |  |
|  | stmt = text("SELECT id, name FROM userinfo WHERE id > :y").bindparams(y=1) |
|  | with Session(engine) as session: |
|  |  result = session.execute(stmt) |
|  |  print(result.all()) |
|  | # [(2, 'name2'), (3, 'name2')] |
|  |  |
|  | # ROLLBACK |
|  |  |

在ORM中创建表

使用ORM时, 我们也需要MetaData, 不同的是, 我们是通过sqlalchemy.orm.registry构造的. 而且, 我们不需要像Core那样直接声明Table, 而是继承某个公共基类 (Base), 添加属性即可. 有两种方式定义基类.
方式一:

|  | from sqlalchemy.orm import registry |
|  | mapper\_registry = registry() |
|  | print(mapper\_registry.metadata) # MetaData对象 |
|  | # 公共基类 |
|  | Base = mapper\_registry.generate\_base() |

方法二:

|  | from sqlalchemy.orm import declarative\_base |
|  |  |
|  | # 内部 return registry(...).generate\_base(...) |
|  | Base = declarative\_base() |
|  |  |

现在你可以像在Django ORM中一样, 定义表并在数据库中创建表, 每一个Column表示一列数据, 关于Column的写法, 见: Column定义

|  | from sqlalchemy import Column, String, Integer, create\_engine, SMALLINT, Boolean, ForeignKey |
|  | from sqlalchemy.orm import relationship, declarative\_base, sessionmaker |
|  |  |
|  | # 导入公共基类 |
|  | Base = declarative\_base() |
|  | # 数据库配置 |
|  | DATABASE\_CONFIG = { |
|  | "username": "root", |
|  | "password": "123456", |
|  | "host": "localhost", |
|  | "database": "test" |
|  | } |
|  | # 连接mysql |
|  | engine = create\_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE\_CONFIG), |
|  |  echo=True, future=True) |
|  |  |
|  |  |
|  | class Student(Base): |
|  |  \_\_tablename\_\_ = "student" |
|  |  sid = Column("sid", Integer, primary\_key=True) |
|  |  name = Column("name", String(32), nullable=False, index=True, comment="姓名") |
|  |  age = Column("age", SMALLINT, nullable=False, comment="年龄") |
|  |  gender = Column("gender", Boolean, nullable=False, comment="性别, True: 男, False: 女") |
|  |  |
|  |  |
|  | class Course(Base): |
|  |  \_\_tablename\_\_ = "course" |
|  |  cid = Column("cid", Integer, primary\_key=True) |
|  |  name = Column("name", String(10), nullable=False, comment="科目名") |
|  |  tid = Column("tid", ForeignKey("teacher.tid"), comment="课程教师") |
|  |  |
|  |  |
|  | class Teacher(Base): |
|  |  \_\_tablename\_\_ = "teacher" |
|  |  tid = Column("tid", Integer, primary\_key=True) |
|  |  name = Column("name", String(10), nullable=False, comment="教师名") |
|  |  |
|  |  |
|  | class Score(Base): |
|  |  \_\_tablename\_\_ = "score" |
|  |  sid = Column("sid", Integer, primary\_key=True) |
|  |  score = Column("score", SMALLINT, nullable=False, comment="成绩") |
|  |  student\_id = Column("student\_id", ForeignKey("student.sid"), comment="成绩所属学生") |
|  |  course\_id = Column("course\_id", ForeignKey("course.cid"), comment="成绩所属科目") |
|  |  |
|  |  |
|  | Base.metadata.create\_all(bind=engine) |
|  |  |
|  | """ |
|  | -- 对于sql |
|  | CREATE TABLE student ( |
|  |  sid INTEGER NOT NULL AUTO\_INCREMENT,  |
|  |  name VARCHAR(32) NOT NULL COMMENT '姓名',  |
|  |  age SMALLINT NOT NULL COMMENT '年龄',  |
|  |  gender BOOL NOT NULL COMMENT '性别, True: 男, False: 女',  |
|  |  PRIMARY KEY (sid) |
|  | ) |
|  | CREATE TABLE teacher ( |
|  |  tid INTEGER NOT NULL AUTO\_INCREMENT,  |
|  |  name VARCHAR(10) NOT NULL COMMENT '教师名',  |
|  |  PRIMARY KEY (tid) |
|  | ) |
|  | CREATE TABLE course ( |
|  |  cid INTEGER NOT NULL AUTO\_INCREMENT,  |
|  |  name VARCHAR(10) NOT NULL COMMENT '科目名',  |
|  |  tid INTEGER COMMENT '课程教师',  |
|  |  PRIMARY KEY (cid),  |
|  |  FOREIGN KEY(tid) REFERENCES teacher (tid) |
|  | ) |
|  | CREATE TABLE score ( |
|  |  sid INTEGER NOT NULL AUTO\_INCREMENT,  |
|  |  score SMALLINT NOT NULL COMMENT '成绩',  |
|  |  student\_id INTEGER COMMENT '成绩所属学生',  |
|  |  course\_id INTEGER COMMENT '成绩所属科目',  |
|  |  PRIMARY KEY (sid),  |
|  |  FOREIGN KEY(student\_id) REFERENCES student (sid),  |
|  |  FOREIGN KEY(course\_id) REFERENCES course (cid) |
|  | ) |
|  |   |
|  | """ |

Base.metadataMetaData对象, 常用的MetaData方法见: MetaData

注: 你通过Student.__table__属性可以查看Table, 也可以通过Student.name访问某一列
你也可以通过__init__显示定义某些列

增删改查数据

插入数据

接上文 “在ORM中创建表” 中的表

1.x的接口与2.0的接口一样, 都是调用session.add(instance)方法添加到数据库 (add方法下次刷新操作时, 将instance保存到数据库)
注意: 自动生成的数据, 在未插入到数据库之前, 都为None, 如: 自动生成的主键

你也可以调用add_all(instance1, instance2, ...)方法, 区别只是插入一条和多条数据而已

|  | from sqlalchemy import Column, String, Integer, create\_engine, SMALLINT, Boolean, ForeignKey |
|  | from sqlalchemy.orm import relationship, declarative\_base, sessionmaker |
|  | from sqlalchemy.orm import Session |
|  | from typing import Any |
|  |  |
|  | # 导入公共基类 |
|  | Base = declarative\_base() |
|  | # 数据库配置 |
|  | DATABASE\_CONFIG = { |
|  | "username": "root", |
|  | "password": "123456", |
|  | "host": "localhost", |
|  | "database": "test" |
|  | } |
|  | # 连接mysql |
|  | engine = create\_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE\_CONFIG), |
|  |  echo=True, future=True) |
|  |  |
|  |  |
|  | class Student(Base): |
|  |  \_\_tablename\_\_ = "student" |
|  |  sid = Column("sid", Integer, primary\_key=True) |
|  |  name = Column("name", String(32), nullable=False, index=True, comment="姓名") |
|  |  age = Column("age", SMALLINT, nullable=False, comment="年龄") |
|  |  gender = Column("gender", Boolean, nullable=False, comment="性别, True: 男, False: 女") |
|  |  |
|  |  |
|  | class Course(Base): |
|  |  \_\_tablename\_\_ = "course" |
|  |  cid = Column("cid", Integer, primary\_key=True) |
|  |  name = Column("name", String(10), nullable=False, comment="科目名") |
|  |  tid = Column("tid", ForeignKey("teacher.tid"), comment="课程教师") |
|  |  |
|  |  |
|  | class Teacher(Base): |
|  |  \_\_tablename\_\_ = "teacher" |
|  |  tid = Column("tid", Integer, primary\_key=True) |
|  |  name = Column("name", String(10), nullable=False, comment="教师名") |
|  |  |
|  |  |
|  | class Score(Base): |
|  |  \_\_tablename\_\_ = "score" |
|  |  sid = Column("sid", Integer, primary\_key=True) |
|  |  score = Column("score", SMALLINT, nullable=False, comment="成绩") |
|  |  student\_id = Column("student\_id", ForeignKey("student.sid"), comment="成绩所属学生") |
|  |  course\_id = Column("course\_id", ForeignKey("course.cid"), comment="成绩所属科目") |
|  |  |
|  |  |
|  | Base.metadata.create\_all(bind=engine) |
|  |  |
|  | SessionLocal = sessionmaker(bind=engine) |
|  |  |
|  |  |
|  | # 一般将 添加到数据库 封装成一个函数 |
|  | def create\_data(db: Session, target\_cls: Any, **kwargs): |
|  | try: |
|  |  cls\_obj = target\_cls(**kwargs) |
|  | # 添加一个 |
|  |  db.add(cls\_obj) |
|  | # 添加多个: |
|  | # db.add\_all([obj1, obj2, ...]) |
|  |  |
|  |  db.commit() |
|  | # 手动将 数据 刷新到数据库 |
|  |  db.refresh(cls\_obj) |
|  | return cls\_obj |
|  | except Exception as e: |
|  | # 别忘记发生错误时回滚 |
|  |  db.rollback() |
|  | raise e |
|  |  |
|  |  |
|  | session = SessionLocal() |
|  |  |
|  | # -------------- 创建学生数据 |
|  | student = create\_data(session, Student, sid=1, name="张三", age=22, gender=True) |
|  |  |
|  | # -------------- 创建教师数据 |
|  | teacher = create\_data(session, Teacher, tid=1, name="语文老师") |
|  |  |
|  | # -------------- 创建课程数据 |
|  | course = create\_data(session, Course, cid=1, name="语文", tid=teacher.tid) |
|  |  |
|  | # -------------- 创建成绩数据 |
|  | score = create\_data(session, Score, sid=1, score=89, student\_id=student.sid, course\_id=course.cid) |
|  |  |

注意: 自动生成主键时, 只有在刷新到数据库中后, 才能获取主键

总的来说, 插入数据代码一般为:

|  | # 1. 实例化一个表类 |
|  | db\_city = CityTable(....) |
|  |  |
|  | # 2. 调用session的add方法 |
|  | session.add(db\_city) |
|  |  |
|  | # 3. 调用session的commit方法 提交事务 |
|  | session.commit() |
|  |  |
|  | # 4. 手动调用session的refresh方法 将数据刷新到数据库 |
|  | session.refresh(db\_city) |
|  |  |

删除数据

1.x的方法

主要步骤是先查询再删除, 一般形式为: session.query(...).filter(...).delete()

|  | from sqlalchemy import Column, Integer, create\_engine, SMALLINT, ForeignKey |
|  | from sqlalchemy.orm import declarative\_base, sessionmaker |
|  |  |
|  | # 导入公共基类 |
|  | Base = declarative\_base() |
|  | # 数据库配置 |
|  | DATABASE\_CONFIG = { |
|  | "username": "root", |
|  | "password": "123456", |
|  | "host": "localhost", |
|  | "database": "test" |
|  | } |
|  | # 连接mysql |
|  | engine = create\_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE\_CONFIG), |
|  |  echo=True, future=True) |
|  |  |
|  |  |
|  | class Score(Base): |
|  |  \_\_tablename\_\_ = "score" |
|  |  sid = Column("sid", Integer, primary\_key=True) |
|  |  score = Column("score", SMALLINT, nullable=False, comment="成绩") |
|  |  student\_id = Column("student\_id", ForeignKey("student.sid"), comment="成绩所属学生") |
|  |  course\_id = Column("course\_id", ForeignKey("course.cid"), comment="成绩所属科目") |
|  |  |
|  |  |
|  | Base.metadata.create\_all(bind=engine) |
|  |  |
|  | SessionLocal = sessionmaker(bind=engine) |
|  |  |
|  | with SessionLocal() as session: |
|  | # 方法一: 调用 session.delete 方法 |
|  |  s = session.query(Score).filter(Score.score == 59).first() |
|  |  session.delete(s) |
|  |  |
|  | # 方法二: 查询后直接删除 |
|  |  session.query(Score).filter(Score.score == 59).delete() |
|  |  session.commit() |
|  |  |

2.0的方法

像Core一样删除数据, 即delte(...).where(...)

|  | from sqlalchemy import Column, Integer, create\_engine, SMALLINT, ForeignKey |
|  | from sqlalchemy.orm import declarative\_base, sessionmaker |
|  | from sqlalchemy import select, delete |
|  |  |
|  | # 导入公共基类 |
|  | Base = declarative\_base() |
|  | # 数据库配置 |
|  | DATABASE\_CONFIG = { |
|  | "username": "root", |
|  | "password": "123456", |
|  | "host": "localhost", |
|  | "database": "test" |
|  | } |
|  | # 连接mysql |
|  | engine = create\_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE\_CONFIG), |
|  |  echo=True, future=True) |
|  |  |
|  |  |
|  | class Score(Base): |
|  |  \_\_tablename\_\_ = "score" |
|  |  sid = Column("sid", Integer, primary\_key=True) |
|  |  score = Column("score", SMALLINT, nullable=False, comment="成绩") |
|  |  student\_id = Column("student\_id", ForeignKey("student.sid"), comment="成绩所属学生") |
|  |  course\_id = Column("course\_id", ForeignKey("course.cid"), comment="成绩所属科目") |
|  |  |
|  |  |
|  | Base.metadata.create\_all(bind=engine) |
|  |  |
|  | SessionLocal = sessionmaker(bind=engine) |
|  |  |
|  | with SessionLocal() as session: |
|  |  session.execute( |
|  |  delete(Score).where(Score.sid == 1) |
|  |  ) |
|  |  session.commit() |
|  |  |

修改数据

1.x的方法

主要步骤是先查询再更新, 即: session.query(...).filter(...).update(...)

|  | from sqlalchemy import Column, Integer, create\_engine, SMALLINT, ForeignKey |
|  | from sqlalchemy.orm import declarative\_base, sessionmaker |
|  |  |
|  | # 导入公共基类 |
|  | Base = declarative\_base() |
|  | # 数据库配置 |
|  | DATABASE\_CONFIG = { |
|  | "username": "root", |
|  | "password": "123456", |
|  | "host": "localhost", |
|  | "database": "test" |
|  | } |
|  | # 连接mysql |
|  | engine = create\_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE\_CONFIG), |
|  |  echo=True, future=True) |
|  |  |
|  |  |
|  | class Score(Base): |
|  |  \_\_tablename\_\_ = "score" |
|  |  sid = Column("sid", Integer, primary\_key=True) |
|  |  score = Column("score", SMALLINT, nullable=False, comment="成绩") |
|  |  student\_id = Column("student\_id", ForeignKey("student.sid"), comment="成绩所属学生") |
|  |  course\_id = Column("course\_id", ForeignKey("course.cid"), comment="成绩所属科目") |
|  |  |
|  |  |
|  | Base.metadata.create\_all(bind=engine) |
|  |  |
|  | SessionLocal = sessionmaker(bind=engine) |
|  |  |
|  | with SessionLocal() as session: |
|  |  row = session.query(Score).filter(Score.score == 59).update({"score": 60}) |
|  |  |
|  |  print(f"修改的行数: {row}") |
|  |  session.commit() |
|  |  |

2.0的方法

同样和Core一样, 使用update(...).where(...).values(...)的形式更新数据

|  | from sqlalchemy import Column, Integer, create\_engine, SMALLINT, ForeignKey |
|  | from sqlalchemy.orm import declarative\_base, sessionmaker |
|  | from sqlalchemy import update, bindparam |
|  |  |
|  | # 导入公共基类 |
|  | Base = declarative\_base() |
|  | # 数据库配置 |
|  | DATABASE\_CONFIG = { |
|  | "username": "root", |
|  | "password": "123456", |
|  | "host": "localhost", |
|  | "database": "test" |
|  | } |
|  | # 连接mysql |
|  | engine = create\_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE\_CONFIG), |
|  |  echo=True, future=True) |
|  |  |
|  |  |
|  | class Score(Base): |
|  |  \_\_tablename\_\_ = "score" |
|  |  sid = Column("sid", Integer, primary\_key=True) |
|  |  score = Column("score", SMALLINT, nullable=False, comment="成绩") |
|  |  student\_id = Column("student\_id", ForeignKey("student.sid"), comment="成绩所属学生") |
|  |  course\_id = Column("course\_id", ForeignKey("course.cid"), comment="成绩所属科目") |
|  |  |
|  |  |
|  | Base.metadata.create\_all(bind=engine) |
|  |  |
|  | SessionLocal = sessionmaker(bind=engine) |
|  |  |
|  | with SessionLocal() as session: |
|  | # 一般更新 |
|  |  result1 = session.execute(update(Score).where(Score.score == 59).values(score=60)) |
|  |  print(f"受影响行数: {result1.rowcount}") |
|  |  |
|  | # # 更新数据 加上 原来的数据 |
|  |  result2 = session.execute( |
|  |  update(Score).where(Score.score == 59).values(score=Score.score + 1)) |
|  |  print(f"受影响行数: {result2.rowcount}") # 受影响行数: 1 |
|  |  |
|  | # 以字典的形式, 替换更新多个值 |
|  |  result3 = session.execute( |
|  |  update(Score).where(Score.score == bindparam('old\_score')).values(score=bindparam('new\_score')), |
|  |  [ |
|  |  {"old\_score": 59, "new\_score": 60}, |
|  |  ] |
|  |  ) |
|  |  |
|  |  print(f"受影响行数: {result3.rowcount}") |
|  |  |
|  |  session.commit() |
|  |  |

同样.rowcount属性获取受影响行数

查询数据

1.x的方法

在SQLAlchemy1.x查询方式中, 使用Query对象进行查询, 类似于Django ORM的管理器, 可以较为简单地查询数据
假如要查询的表如下:

|  | class User(Base): |
|  |  \_\_tablename\_\_ = "User" # 设置表名 |
|  |  uid = Column(Integer, primary\_key=True) |
|  |  username = Column(String(80), unique=True) |
|  |  email = Column(String(120), unique=True) |
|  |  tags = Column(String(120)) |
|  |  |
|  | def \_\_repr\_\_(self): |
|  | return '' % self.username |

表的数据:

|  | mysql> select * from User; |
|  | +-----+----------+-------------------+------+ |
|  | | uid | username | email | tags | |
|  | +-----+----------+-------------------+------+ |
|  | | 1 | 张三 | zhangesan@xx.com | 热情 | |
|  | | 2 | 李四 | lisi@xx.com | 热情 | |
|  | | 3 | 王五 | wangwu@xx.com | 开朗 | |
|  | | 4 | lczmx | lczmx@foxmail.com | 热情 | |
|  | +-----+----------+-------------------+------+ |
|  | 4 rows in set (0.10 sec) |
|  |  |
|  | mysql>  |

使用例子:

|  | from sqlalchemy import Column, Integer, create\_engine, String |
|  | from sqlalchemy.orm import declarative\_base, sessionmaker |
|  | from sqlalchemy import not\_, or\_, desc |
|  |  |
|  | # 导入公共基类 |
|  | Base = declarative\_base() |
|  | # 数据库配置 |
|  | DATABASE\_CONFIG = { |
|  | "username": "root", |
|  | "password": "123456", |
|  | "host": "localhost", |
|  | "database": "test" |
|  | } |
|  | # 连接mysql |
|  | engine = create\_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE\_CONFIG), |
|  |  echo=True, future=True) |
|  |  |
|  |  |
|  | class User(Base): |
|  |  \_\_tablename\_\_ = "User" # 设置表名 |
|  |  uid = Column(Integer, primary\_key=True) |
|  |  username = Column(String(80), unique=True) |
|  |  email = Column(String(120), unique=True) |
|  |  tags = Column(String(120)) |
|  |  |
|  | def \_\_repr\_\_(self): |
|  | return '' % self.username |
|  |  |
|  |  |
|  | Base.metadata.create\_all(bind=engine) |
|  |  |
|  | SessionLocal = sessionmaker(bind=engine) |
|  |  |
|  | with SessionLocal() as session: |
|  | # ------------------ 查询所有User数据 |
|  |  session.query(User).all() |
|  | """ |
|  |  对应SQL |
|  |  SELECT `User`.uid AS `User\_uid`, `User`.username AS `User\_username`,  |
|  |  `User`.email AS `User\_email`, `User`.tags AS `User\_tags` FROM `User` |
|  |  """ |
|  |  |
|  | # ------------------ 查询有多少条数据 |
|  |  session.query(User).count() |
|  | """ |
|  |  对应SQL |
|  |  SELECT count(*) AS count\_1 FROM (SELECT `User`.uid AS `User\_uid`, |
|  |  `User`.username AS `User\_username`, `User`.email AS `User\_email`, |
|  |  `User`.tags AS `User\_tags` FROM `User`) AS anon\_1 |
|  |  """ |
|  |  |
|  | # ------------------ 查询第1条数据 |
|  |  session.query(User).first() |
|  | """ |
|  |  对应SQL |
|  |  SELECT `User`.uid AS `User\_uid`, `User`.username AS `User\_username`, |
|  |  `User`.email AS `User\_email`, `User`.tags AS `User\_tags` FROM `User` |
|  |  LIMIT 1 |
|  |  """ |
|  |  |
|  | # ------------------ 根据主键查询 |
|  |  session.query(User).get(1) |
|  | """ |
|  |  对应SQL |
|  |  SELECT `User`.uid AS `User\_uid`, `User`.username AS `User\_username`, |
|  |  `User`.email AS `User\_email`, `User`.tags AS `User\_tags` FROM `User` |
|  |  WHERE `User`.uid = 1 |
|  |  """ |
|  |  |
|  | # ------------------ 简单查询, 使用 关键字实参 的形式来设置字段名 |
|  |  session.query(User).filter\_by(uid=1).all() |
|  | """ |
|  |  对应SQL |
|  |  SELECT `User`.uid AS `User\_uid`, `User`.username AS `User\_username`,  |
|  |  `User`.email AS `User\_email`, `User`.tags AS `User\_tags` FROM `User`  |
|  |  WHERE `User`.uid = 1 |
|  |  """ |
|  |  |
|  | # ------------------ 复杂查询, 可以多个表一起,使用 恒等式'==' 等形式 来设置条件 |
|  |  session.query(User).filter(User.uid == 1).all() |
|  | """ |
|  |  对应SQL |
|  |  SELECT `User`.uid AS `User\_uid`, `User`.username AS `User\_username`, |
|  |  `User`.email AS `User\_email`, `User`.tags AS `User\_tags` FROM `User` |
|  |  WHERE `User`.uid = 1 |
|  |  """ |
|  |  |
|  | # ------------------ filter 查询开头 |
|  |  session.query(User).filter(User.username.startswith("l")).all() |
|  | """ |
|  |  对应SQL |
|  |  SELECT `User`.uid AS `User\_uid`, `User`.username AS `User\_username`, |
|  |  `User`.email AS `User\_email`, `User`.tags AS `User\_tags` FROM `User` |
|  |  WHERE (`User`.username LIKE concat('l', '%%')) |
|  |  """ |
|  |  |
|  | # ------------------ filter 查询结尾 |
|  |  session.query(User).filter(User.username.endswith("x")).all() |
|  | """ |
|  |  对应SQL |
|  |  SELECT `User`.uid AS `User\_uid`, `User`.username AS `User\_username`, |
|  |  `User`.email AS `User\_email`, `User`.tags AS `User\_tags` FROM `User` |
|  |  WHERE (`User`.username LIKE concat('%%', 'x')) |
|  |  """ |
|  |  |
|  | # ------------------ filter 查询是否包含 |
|  |  session.query(User).filter(User.username.contains("lcz")).all() |
|  | """ |
|  |  对应SQL |
|  |  SELECT `User`.uid AS `User\_uid`, `User`.username AS `User\_username`, |
|  |  `User`.email AS `User\_email`, `User`.tags AS `User\_tags` FROM `User` |
|  |  WHERE (`User`.username LIKE concat(concat('%%', "lcz", '%%'))) |
|  |  """ |
|  |  |
|  | # ------------------ filter 模糊查询 |
|  |  session.query(User).filter(User.username.like("%cz%")).all() |
|  | """ |
|  |  对应SQL |
|  |  SELECT `User`.uid AS `User\_uid`, `User`.username AS `User\_username`, |
|  |  `User`.email AS `User\_email`, `User`.tags AS `User\_tags` FROM `User` |
|  |  WHERE `User`.username LIKE "%cz%" |
|  |  """ |
|  |  |
|  | # ------------------ filter 条件取反 (not) |
|  |  session.query(User).filter(not\_(User.username == "lczmx")).all() |
|  | """ |
|  |  对应SQL |
|  |  SELECT `User`.uid AS `User\_uid`, `User`.username AS `User\_username`, |
|  |  `User`.email AS `User\_email`, `User`.tags AS `User\_tags` FROM `User` |
|  |  WHERE `User`.username != "lczmx" |
|  |  """ |
|  |  |
|  | # ------------------ filter条件 或 (or), 默认为and |
|  |  session.query(User).filter( |
|  |  or\_(User.uid == 1, User.uid == 3), ).all() |
|  | """ |
|  |  对应SQL |
|  |  SELECT `User`.uid AS `User\_uid`, `User`.username AS `User\_username`, |
|  |  `User`.email AS `User\_email`, `User`.tags AS `User\_tags` FROM `User` |
|  |  WHERE `User`.uid = 1 OR `User`.uid = 3 |
|  |  """ |
|  |  |
|  | # ------------------ filter条件 and or not 一起使用 |
|  |  session.query(User).filter(or\_(User.uid == 1, User.uid == 4), User.username == "lczmx", |
|  |  not\_(User.email == "wangwu@xx.com")).all() |
|  | """ |
|  |  对应SQL |
|  |  SELECT `User`.uid AS `User\_uid`, `User`.username AS `User\_username`, |
|  |  `User`.email AS `User\_email`, `User`.tags AS `User\_tags` FROM `User` |
|  |  WHERE (`User`.uid = 1 OR `User`.uid = 4) |
|  |  AND `User`.username = "lczmx" AND `User`.email = "lczmx@foxmail.com" |
|  |  """ |
|  |  |
|  | # ------------------ filter 取反查询 |
|  |  session.query(User).filter(User.username != "lczmx").all() |
|  | """ |
|  |  对应SQL |
|  |  SELECT `User`.uid AS `User\_uid`, `User`.username AS `User\_username`, |
|  |  `User`.email AS `User\_email`, `User`.tags AS `User\_tags` FROM `User` |
|  |  WHERE `User`.username != "lczmx"; |
|  |  """ |
|  |  |
|  | # ------------------ 查询uid为[1, 3, 5, 7, 9]的用户 |
|  |  session.query(User).filter(User.uid.in\_([1, 3, 5, 7, 9])).all() |
|  | """ |
|  |  对应SQL |
|  |  SELECT `User`.uid AS `User\_uid`, `User`.username AS `User\_username`, |
|  |  `User`.email AS `User\_email`, `User`.tags AS `User\_tags` FROM `User` |
|  |  WHERE `User`.uid IN (1, 3, 5, 7, 9) |
|  |  """ |
|  |  |
|  | # ------------------ 分组查询 |
|  | # !! 注意不是query(User), 因为Query(User)对应的SQL为: |
|  | # SELECT `User`.uid AS `User\_uid`, `User`.username AS `User\_username`, |
|  | # `User`.email AS `User\_email`, `User`.tags AS `User\_tags` |
|  |  session.query(User.tags).group\_by(User.tags).all() |
|  | """ |
|  |  对应SQL |
|  |  SELECT `User`.tags AS `User\_tags` FROM `User` GROUP BY `User`.tags |
|  |  """ |
|  |  |
|  | # ------------------ 排序 顺序 |
|  |  session.query(User).order\_by(User.uid).all() |
|  | """ |
|  |  对应SQL |
|  |  SELECT `User`.uid AS `User\_uid`, `User`.username AS `User\_username`, |
|  |  `User`.email AS `User\_email`, `User`.tags AS `User\_tags` |
|  |  FROM `User` ORDER BY `User`.uid |
|  |  """ |
|  |  |
|  | # ------------------ 排序 倒序 |
|  |  session.query(User).order\_by(desc(User.uid)).all() |
|  | """ |
|  |  对应SQL |
|  |  SELECT `User`.uid AS `User\_uid`, `User`.username AS `User\_username`, |
|  |  `User`.email AS `User\_email`, `User`.tags AS `User\_tags` FROM `User` |
|  |  ORDER BY `User`.uid DESC |
|  |  """ |
|  |  |
|  | # ------------------ 去重 |
|  |  session.query(User.tags).distinct().all() |
|  | """ |
|  |  对应SQL: |
|  |  SELECT DISTINCT `User`.tags AS `User\_tags` FROM `User`; |
|  |  """ |
|  |  |
|  | # ------------------ 取几条数据 |
|  |  session.query(User).limit(2).all() |
|  | """ |
|  |  对应SQL: |
|  |  SELECT `User`.uid AS `User\_uid`, `User`.username AS `User\_username`, |
|  |  `User`.email AS `User\_email`, `User`.tags AS `User\_tags` FROM `User` |
|  |  LIMIT 2; |
|  |  """ |
|  |  |
|  | # ------------------ 跳过几条个数据 |
|  |  session.query(User).offset(1).limit(2).all() |
|  | """ |
|  |  对应SQL |
|  |  SELECT `User`.uid AS `User\_uid`, `User`.username AS `User\_username`, |
|  |  `User`.email AS `User\_email`, `User`.tags AS `User\_tags` FROM `User` |
|  |  LIMIT 1, 2; |
|  |  """ |
|  |  |

关于query返回的对象
query的返回对象为sqlalchemy.orm.query.Query对象, 你可以与Result对象进行对比, 主要有以下的方法:

  • all()
    返回由表对象组成的列表
  • first()
    返回第一个结果 (表对象), 内部执行limit SQL
  • one()
    只返回一行数据或引发异常 (无数据时抛出: sqlalchemy.exc.NoResultFound, 多行数据时抛出: sqlalchemy.exc.MultipleResultsFound)
  • one_or_none()
    最多返回一行数据或引发异常 (无数据时返回None, 多行数据时抛出: sqlalchemy.exc.MultipleResultsFound)
  • scalar()
    获取第一行的第一列数据. 如果没有要获取的行, 则返回None, 多行数据时抛出: sqlalchemy.exc.MultipleResultsFound

以上查询不包含一些连表操作, 见: relationship连表操作

2.0的方法

2.0的返回结果也是Result对象, 关于Result对象, 见: Result

注意: 由于2.0的查询方式, Core和ORM都可以使用, 所以放在一起, 见下文: 查询数据详解

relationship连表操作

我们自定义外键时, 一般的步骤是:

  1. 子表使用字段名= Column(Integer, ForeignKey('主表名.主键'))的格式定义
  2. 除此外, 还需要在主表中定义relationship用于子表与主表之间的跨表查询

完整例子:

|  | from sqlalchemy import Column, Integer, create\_engine, String, Text, ForeignKey |
|  | from sqlalchemy.orm import declarative\_base, sessionmaker, relationship |
|  |  |
|  | # 导入公共基类 |
|  | Base = declarative\_base() |
|  | # 数据库配置 |
|  | DATABASE\_CONFIG = { |
|  | "username": "root", |
|  | "password": "123456", |
|  | "host": "localhost", |
|  | "database": "test" |
|  | } |
|  | # 连接mysql |
|  | engine = create\_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE\_CONFIG), |
|  |  echo=True, future=True) |
|  |  |
|  |  |
|  | class User(Base): |
|  |  \_\_tablename\_\_ = 'user' |
|  |  uid = Column(Integer, primary\_key=True, autoincrement=True) |
|  |  username = Column(String(16), nullable=False) |
|  |  password = Column(String(32), nullable=False) |
|  | # Article是类名 user是反向访问的属性名称 |
|  |  article = relationship("Article", backref="user") |
|  | """ |
|  |  article = relationship("Article", backref="user") |
|  |  相当于: |
|  |  class User(Base): |
|  |  # Article是类名 user是反向访问的属性名称 |
|  |  article = relationship("Article", back\_populates="user") |
|  |   |
|  |  class Article(Base): |
|  |  # User是类名 addresses是反向访问的属性名称 |
|  |  user = relationship("User", back\_populates="addresses") |
|  |   |
|  |  """ |
|  |  |
|  | def \_\_repr\_\_(self): |
|  | return "" % self.username |
|  |  |
|  |  |
|  | class Article(Base): |
|  |  \_\_tablename\_\_ = 'article' |
|  |  aid = Column(Integer, primary\_key=True, autoincrement=True) |
|  |  title = Column(String(36), nullable=False) |
|  |  content = Column(Text, nullable=False) |
|  |  author\_id = Column(Integer, ForeignKey("user.uid")) |
|  |  |
|  | def \_\_repr\_\_(self): |
|  | return "" % self.title |
|  |  |
|  |  |
|  | Base.metadata.create\_all(bind=engine) |
|  |  |

关于relationship
relationship在定义外键时, 有非常重要的作用, 如: 1. 跨表操作 2. 设置删除主表数据时子表的值
relationship的参数很多, 这里只列出常用的几个, 全部参数见文档: relationship

  • back_populates
    指定反向访问的属性名称
  • backref
    快捷设置两个relationship (设置back_populates的话, 要设置两个表)
  • cascade
    用于控制修改数据时的选项
说明
save-update 默认选项, 在添加一条数据的时候,会把其他和它相关联的数据都添加到数据库中。这种行为就是save-update属性影响的
delete 表示当删除某一个模型中的数据的时候,是否也删除掉使用relationship和它关联的数据
delete-orphan 表示当对一个ORM对象解除了父表中的关联对象的时候,自己便会被删除掉。当然如果表中的数据被删除,自己也会被删除。这个选项只能用在一对多上,不能用在多对多以及多对一上。并且还需要在子表中的relationship中,增加一个single_parent=True的参数
merge 默认选项, 当在使用session.merge,合并一个对象的时候,会将使用了relationship相关联的对象也进行merge操作
expunge 移除操作的时候,会将相关联的对象也进行移除。这个操作只是从session中移除,并不会真正的从数据库中删除
all 是对save-updatemergerefresh-expireexpungedelete几种的填写

比如:

|  | articles = relationship("Article",cascade="save-update,delete") |
  • order_by
    子表列表的排序方式
|  | # 倒序 |
|  | article = relationship("Article", backref="user", order\_by="Article.aid.desc()") |
|  | # 正序 |
|  | article = relationship("Article", backref="user", order\_by="Article.aid") |

本部分包括Core与ORM的跨表增删改查操作

|  | select `user`.uid as uid, `user`.username, `user`.password, |
|  |  (select GROUP\_CONCAT(`article`.title) from article  |
|  | where `article`.author\_id = `user`.uid) as article |
|  |  from user; |
|  |  |
|  | +-----+----------+----------+---------------+ |
|  | | uid | username | password | article | |
|  | +-----+----------+----------+---------------+ |
|  | | 1 | 张三 | 12345 | C++入门,C入门 | |
|  | | 2 | 李四 | 12346 | python入门 | |
|  | +-----+----------+----------+---------------+ |

GROUP_CONCAT可以让多行数据拼接成一行数据, 以,链接

通过relationship双向访问

即直接通过relationship访问主表或子表

|  | from sqlalchemy.orm import sessionmaker |
|  | from sqlalchemy import select |
|  |  |
|  | SessionLocal = sessionmaker(bind=engine) |
|  |  |
|  | with SessionLocal() as session: |
|  | # +++++++++++++++++++++++++ 子表访问主表, 返回主表对象 |
|  | """ |
|  |  实质上内部执行的SQL |
|  |  SELECT user.uid AS user\_uid, user.username AS user\_username, user.password AS user\_password |
|  |  FROM user |
|  |  WHERE user.uid = %(pk\_1)s |
|  |  """ |
|  | # ----------- 1.x方法 |
|  |  article\_1x = session.query(Article).first() |
|  |  print(article\_1x.user) #  |
|  | # ----------- 2.0 方法 |
|  |  article\_20 = session.execute(select(Article)).first() |
|  |  print(article\_20.Article.user) #  |
|  |  |
|  | # +++++++++++++++++++++++++ 主表访问子表, 返回子表列表 |
|  | # 实质是 sqlalchemy.orm.collections.InstrumentedList 对象 |
|  | # 是List的子类 |
|  | """ |
|  |  实质上内部执行的SQL |
|  |  SELECT article.aid AS article\_aid, article.title AS article\_title, |
|  |  article.content AS article\_content, article.author\_id AS article\_author\_id |
|  |  FROM article |
|  |  WHERE %(param\_1)s = article.author\_id ORDER BY article.aid |
|  |  """ |
|  | # ----------- 1.x方法 |
|  |  user\_1x = session.query(User).first() |
|  |  print(user\_1x.article) |
|  | # [, ] |
|  | # ----------- 2.0方法 |
|  |  user\_20 = session.execute(select(User)).first() |
|  |  print(user\_20.User.article) |
|  | # [, ] |
|  |  |

通过relationship修改关联关系

上面例子中说过, 主表.relationship字段InstrumentedList对象 (类似于List), 我们可以修改它, 然后调用commit方法即可. 子表.relationship字段是对应的主表, 可以修改为自己想要的, 同样调用commit方法即可

|  | from sqlalchemy.orm import sessionmaker |
|  | from sqlalchemy import select |
|  |  |
|  | SessionLocal = sessionmaker(bind=engine) |
|  |  |
|  | with SessionLocal() as session: |
|  | # +++++++++++++++++++++++++ 子表修改属于的主表 |
|  | """ |
|  |  对于SQL |
|  |  UPDATE article  |
|  |  SET author\_id=%(author\_id)s  |
|  |  WHERE article.aid = %(article\_aid)s |
|  |  """ |
|  | # ----------- 1.x方法 |
|  |  lisi = session.query(User).get(2) |
|  |  |
|  |  article\_1x = session.query(Article).first() |
|  |  print(article\_1x.user) #  |
|  |  article\_1x.user = lisi # 改为  |
|  |  session.commit() # 记得提交 |
|  | # ----------- 2.0 方法 |
|  |  zhangsan = session.execute(select(User).where(User.uid == 1)).first() |
|  |  |
|  |  article\_20 = session.execute(select(Article)).first() |
|  |  print(article\_20.Article.user) #  |
|  |  article\_20.Article.user = zhangsan.User # 改为  |
|  |  session.commit() # 记得提交 |
|  |  |
|  | # +++++++++++++++++++++++++ 主表修改子表列表 |
|  |  |
|  | # ----------- 1.x方法 增加 |
|  |  user\_1x = session.query(User).first() |
|  | # 添加一个新的子表数据 |
|  | # 会在Article中插入一条新的数据 |
|  |  user\_1x.article.append(Article(title="javascript 入门", content="console.log(hello world)")) |
|  |  session.commit() # 记得提交 |
|  |  |
|  | # ----------- 2.0 方法 移除 |
|  |  user\_20 = session.execute(select(User)).first() |
|  |  print(user\_20.User.article) # [, , ] |
|  |  |
|  |  article\_js = session.execute(select(Article).where(Article.aid == 4)).scalar() |
|  | # 从主表中移除与子表的关系 |
|  |  user\_20.User.article.remove(article\_js) |
|  |  session.commit() # 记得提交 |
|  |  |

通过relationship修改子/主表数据

同样非常简单, 找到对应的类, 然后修改数据并commit即可

|  | from sqlalchemy.orm import sessionmaker |
|  | from sqlalchemy import select |
|  |  |
|  | SessionLocal = sessionmaker(bind=engine) |
|  |  |
|  | with SessionLocal() as session: |
|  | # +++++++++++++++++++++++++ 通过子表修改对应主表的数据 |
|  | # ----------- 1.x方法 |
|  |  article\_1x = session.query(Article).first() |
|  |  print(article\_1x.user) #  |
|  |  |
|  |  article\_1x.user.username = "张三二号" |
|  |  session.commit() # 记得提交 |
|  | # ----------- 2.0 方法 |
|  |  |
|  |  article\_20 = session.execute(select(Article)).first() |
|  |  print(article\_20.Article.user) #  |
|  |  |
|  |  article\_20.Article.user.username = "张三" |
|  |  session.commit() # 记得提交 |
|  |  |
|  | # +++++++++++++++++++++++++ 通过主表修改对应子表的数据 |
|  | # ----------- 1.x方法 |
|  |  user\_1x = session.query(User).first() |
|  |  print(user\_1x.article) # [, , ] |
|  |  |
|  |  user\_1x.article[-1].title = "js入门" |
|  |  session.commit() # 记得提交 |
|  | # ----------- 2.0 方法 |
|  |  user\_20 = session.execute(select(User)).scalar() |
|  |  |
|  |  print(user\_20.article) # [, , ] |
|  |  |
|  |  user\_20.article[-1].title = "javascript 入门" |
|  |  session.commit() # 记得提交 |
|  |  |

通过relationship查询数据

正向查询, 子表利用主表的条件查询, 使用has, 条件和普通查询的条件一样
反向查询, 主表利用子表的条件查询, 使用any, 条件和普通查询的条件一样

|  | from sqlalchemy.orm import sessionmaker |
|  | from sqlalchemy import select |
|  |  |
|  | SessionLocal = sessionmaker(bind=engine) |
|  |  |
|  | with SessionLocal() as session: |
|  | # +++++++++++++++++++++++++ 反向查询 |
|  | """ |
|  |  对应的SQL |
|  |  SELECT user.uid AS user\_uid, user.username AS user\_username, user.password AS user\_password |
|  |  FROM user |
|  |  WHERE EXISTS (SELECT 1 |
|  |  FROM article |
|  |  WHERE user.uid = article.author\_id AND article.title LIKE 'python%') |
|  |  """ |
|  | # ********* 查询有以python开头的Article的User |
|  | # ----------- 1.x方法 |
|  |  user\_1x = session.query(User).filter(User.article.any(Article.title.like("python%"))) |
|  |  print(user\_1x.all()) # [] |
|  |  |
|  | # ----------- 2.0方法 |
|  |  user\_20 = session.execute( |
|  |  select(User).where(User.article.any(Article.title.like("python%")))) |
|  |  print(user\_20.all()) # [(,)] |
|  |  |
|  | # +++++++++++++++++++++++++ 正向查询 |
|  | """ |
|  |  对应的SQL |
|  |  SELECT article.aid AS article\_aid, article.title AS article\_title,  |
|  |  article.content AS article\_content, article.author\_id AS article\_author\_id  |
|  |  FROM article  |
|  |  WHERE EXISTS (SELECT 1  |
|  |  FROM user  |
|  |  WHERE user.uid = article.author\_id AND (user.username LIKE concat(concat('%%', '四', '%%'))) |
|  |  """ |
|  | # ********* 查询User表中username有 四 的Article |
|  | # ----------- 1.x方法 |
|  |  article\_1x = session.query(Article).filter(Article.user.has(User.username.contains("四"))) |
|  |  print(article\_1x.all()) # [] |
|  |  |
|  | # ----------- 2.0方法 |
|  |  article\_20 = session.execute( |
|  |  select(Article).where(Article.user.has(User.username.contains("四")))) |
|  |  print(article\_20.all()) # [(,)] |
|  |  |

建立多对多关系

可以通过relationship便捷使用多对多关系

|  | from sqlalchemy import Table, Text, Column, ForeignKey |
|  |  |
|  | # 第三张表 |
|  | post\_keywords = Table("post\_keywords", Base.metadata, |
|  |  Column('post\_id', ForeignKey('posts.id'), primary\_key=True), |
|  |  Column('keyword\_id', ForeignKey('keywords.id'), primary\_key=True) |
|  |  ) |
|  |  |
|  |  |
|  | class BlogPost(Base): |
|  |  \_\_tablename\_\_ = 'posts' |
|  |  |
|  | id = Column(Integer, primary\_key=True) |
|  |  headline = Column(String(255), nullable=False) |
|  |  body = Column(Text) |
|  |  |
|  | # 多对多关系 BlogPost<->Keyword |
|  |  keywords = relationship('Keyword', |
|  |  secondary=post\_keywords, |
|  |  back\_populates='posts') |
|  |  |
|  | def \_\_init\_\_(self, headline, body): |
|  |  self.headline = headline |
|  |  self.body = body |
|  |  |
|  | def \_\_repr\_\_(self): |
|  | return "BlogPost(%r, %r)" % (self.headline, self.body) |
|  |  |
|  |  |
|  | class Keyword(Base): |
|  |  \_\_tablename\_\_ = 'keywords' |
|  | id = Column(Integer, primary\_key=True) |
|  |  keyword = Column(String(50), nullable=False, unique=True) |
|  | # 多对多关系 Keyword<->BlogPost |
|  |  posts = relationship('BlogPost', |
|  |  secondary=post\_keywords, |
|  |  back\_populates='keywords') |
|  |  |
|  | def \_\_init\_\_(self, keyword): |
|  |  self.keyword = keyword |
|  |  |
|  |  |
|  | Base.metadata.create\_all(bind=engine) |
|  |  |

添加操作例子:

|  | from sqlalchemy import select |
|  |  |
|  | SessionLocal = sessionmaker(bind=engine) |
|  | with SessionLocal() as session: |
|  | # 比如添加一篇文章, 为文章添加多个keyword |
|  |  blog = BlogPost(headline="起飞!", body="我是文章的内容") |
|  | # 获取子表列表 并 append |
|  |  blog.keywords.append(Keyword("新闻")) |
|  |  blog.keywords.append(Keyword("热门")) |
|  |  session.add(blog) |
|  |  session.commit() |
|  | # ------- 添加第二篇文章 |
|  |  keyword1 = session.execute( |
|  |  select(Keyword).filter\_by(keyword="热门") |
|  |  ).scalar() |
|  |  new\_blog = BlogPost(headline="震惊!", body="我是第二篇文章的内容") |
|  |  new\_blog.keywords.append(keyword1) |
|  |  session.add(new\_blog) |
|  |  session.commit() |
|  |  |

查询操作例子:

|  | from sqlalchemy import select |
|  |  |
|  | SessionLocal = sessionmaker(bind=engine) |
|  | with SessionLocal() as session: |
|  | # 查询所有 "热门" 文章 |
|  |  blog = session.execute( |
|  |  select(BlogPost).where(BlogPost.keywords.any(Keyword.keyword == "热门")) |
|  |  ).all() |
|  |  |
|  |  print(blog) |
|  | # [(BlogPost('起飞!', '我是文章的内容'),),  |
|  | # (BlogPost('震惊!', '我是第二篇文章的内容'),)] |
|  |  |

其他操作也像一般的relationship一样操作

项目示范

一般来说, 我们使用SQLAlchemy的步骤大多相同, 下面
文件结构:

|  | +--- database.py # 用于 初始化session 和 公共基类 |
|  | +--- models.py # 定义表 |
|  | +--- crud.py # 封装增删改查的方法 |
|  | +--- schemas.py # 定义pydantic模型, 用于格式化已经取得的数据 [可选] |
|  | +--- main.py # 执行主逻辑 |

database.py 初始化session和 公共基类:

|  | from sqlalchemy import create\_engine |
|  | from sqlalchemy.orm import declarative\_base, sessionmaker |
|  |  |
|  | # 数据库的URL |
|  | # 替换成自己的 |
|  | SQLALCHEMY\_DATABASE\_URL = 'sqlite:///./coronavirus.sqlite3' |
|  |  |
|  | # 创建引擎 |
|  | engine = create\_engine( |
|  | # echo=True表示引擎将用repr()函数记录所有语句及其参数列表到日志 |
|  |  SQLALCHEMY\_DATABASE\_URL, encoding='utf-8', echo=True |
|  | ) |
|  |  |
|  | # SessionLocal用于对数据的增删改查 |
|  | # flush()是指发送数据库语句到数据库,但数据库不一定执行写入磁盘;commit()是指提交事务,将变更保存到数据库文件 |
|  | SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False, expire\_on\_commit=True) |
|  |  |
|  | # 创建基本映射类, 用于创建表 |
|  | Base = declarative\_base(bind=engine, name='Base') |
|  |  |

models.py 定义表结构

|  | from sqlalchemy import Column, String, Integer, BigInteger, Date, DateTime, ForeignKey, func |
|  | from sqlalchemy.orm import relationship |
|  | # 导入公共基类 |
|  | from .database import Base |
|  |  |
|  |  |
|  | class City(Base): |
|  |  \_\_tablename\_\_ = 'city' |
|  |  |
|  | id = Column(Integer, primary\_key=True, index=True, autoincrement=True) |
|  |  province = Column(String(100), unique=True, nullable=False, comment='省/直辖市') |
|  |  country = Column(String(100), nullable=False, comment='国家') |
|  |  country\_code = Column(String(100), nullable=False, comment='国家代码') |
|  |  country\_population = Column(BigInteger, nullable=False, comment='国家人口') |
|  |  data = relationship('Data', back\_populates='city') # 'Data'是关联的类名;back\_populates来指定反向访问的属性名称 |
|  |  |
|  |  created\_at = Column(DateTime, server\_default=func.now(), comment='创建时间') |
|  |  updated\_at = Column(DateTime, server\_default=func.now(), onupdate=func.now(), comment='更新时间') |
|  |  |
|  |  \_\_mapper\_args\_\_ = {"order\_by": country\_code} # 默认是正序,倒序加上.desc()方法 |
|  |  |
|  | def \_\_repr\_\_(self): |
|  | return f'{self.country}\_{self.province}' |
|  |  |
|  |  |
|  | class Data(Base): |
|  |  \_\_tablename\_\_ = 'data' |
|  |  |
|  | id = Column(Integer, primary\_key=True, index=True, autoincrement=True) |
|  |  city\_id = Column(Integer, ForeignKey('city.id'), comment='所属省/直辖市') # ForeignKey里的字符串格式不是类名.属性名,而是表名.字段名 |
|  |  date = Column(Date, nullable=False, comment='数据日期') |
|  |  confirmed = Column(BigInteger, default=0, nullable=False, comment='确诊数量') |
|  |  deaths = Column(BigInteger, default=0, nullable=False, comment='死亡数量') |
|  |  recovered = Column(BigInteger, default=0, nullable=False, comment='痊愈数量') |
|  |  city = relationship('City', back\_populates='data') # 'City'是关联的类名;back\_populates来指定反向访问的属性名称 |
|  |  |
|  |  created\_at = Column(DateTime, server\_default=func.now(), comment='创建时间') |
|  |  updated\_at = Column(DateTime, server\_default=func.now(), onupdate=func.now(), comment='更新时间') |
|  |  |
|  |  \_\_mapper\_args\_\_ = {"order\_by": date.desc()} # 按日期降序排列 |
|  |  |
|  | def \_\_repr\_\_(self): |
|  | return f'{repr(self.date)}:确诊{self.confirmed}例' |
|  |  |

crud.py 封装增删改查的方法:

|  | from sqlalchemy.orm import Session |
|  |  |
|  | import models |
|  | import schemas |
|  |  |
|  |  |
|  | def get\_city(db: Session, city\_id: int): |
|  | return db.query(models.City).filter(models.City.id == city\_id).first() |
|  |  |
|  |  |
|  | def get\_city\_by\_name(db: Session, name: str): |
|  | return db.query(models.City).filter(models.City.province == name).first() |
|  |  |
|  |  |
|  | def get\_cities(db: Session, skip: int = 0, limit: int = 10): |
|  | return db.query(models.City).offset(skip).limit(limit).all() |
|  |  |
|  |  |
|  | def create\_city(db: Session, city: schemas.CreateCity): |
|  | """ |
|  |  创建City数据 |
|  |  """ |
|  |  db\_city = models.City(**city.dict()) |
|  |  db.add(db\_city) |
|  |  db.commit() |
|  |  db.refresh(db\_city) |
|  | return db\_city |
|  |  |
|  |  |
|  | def get\_data(db: Session, city: str = None, skip: int = 0, limit: int = 10): |
|  | if city: |
|  | # 外键关联查询,这里不是像Django ORM那样Data.city.province |
|  | return db.query(models.Data).filter(models.Data.city.has(province=city)) |
|  | return db.query(models.Data).offset(skip).limit(limit).all() |
|  |  |
|  |  |
|  | def create\_city\_data(db: Session, data: schemas.CreateData, city\_id: int): |
|  | """ |
|  |  创建Data数据 |
|  |  """ |
|  |  db\_data = models.Data(**data.dict(), city\_id=city\_id) |
|  |  db.add(db\_data) |
|  |  db.commit() |
|  |  db.refresh(db\_data) |
|  | return db\_data |
|  |  |

schemas.py 定义pydantic模型, 用于格式化已经取得的数据:

|  | from datetime import date as date\_ |
|  | from datetime import datetime |
|  |  |
|  | from pydantic import BaseModel |
|  |  |
|  |  |
|  | class CreateData(BaseModel): |
|  |  date: date\_ |
|  |  confirmed: int = 0 |
|  |  deaths: int = 0 |
|  |  recovered: int = 0 |
|  |  |
|  |  |
|  | class CreateCity(BaseModel): |
|  |  province: str |
|  |  country: str |
|  |  country\_code: str |
|  |  country\_population: int |
|  |  |
|  |  |
|  | class ReadData(CreateData): |
|  | id: int |
|  |  city\_id: int |
|  |  updated\_at: datetime |
|  |  created\_at: datetime |
|  |  |
|  | class Config: |
|  |  orm\_mode = True |
|  |  |
|  |  |
|  | class ReadCity(CreateCity): |
|  | id: int |
|  |  updated\_at: datetime |
|  |  created\_at: datetime |
|  |  |
|  | class Config: |
|  |  orm\_mode = True |
|  |  |

main.py 执行主逻辑:

|  | from sqlalchemy.orm import Session |
|  |  |
|  | import crud |
|  | import schemas |
|  |  |
|  | from database import engine, Base, SessionLocal |
|  | from models import City, Data |
|  |  |
|  | # 创建表, 已经存在的将被忽略 |
|  | Base.metadata.create\_all(bind=engine) |
|  | db = SessionLocal() |
|  | # 调用 crud |
|  | db\_city = crud.get\_city\_by\_name(db, name="广东省") |
|  | if db\_city: |
|  | raise Exception("City already registered") |
|  |  |
|  | # 创建数据 |
|  | city = City(...) |
|  | crud.create\_city(db=db, city=city) |
|  |  |

查询数据详解

只有涉及SQL 的查询数据, 必然绕不开FROM WHERE SELECT GROUP BY HAVING ORDER BY LIMIT INNER JOIN ... ON LEFT JOIN ... ON UNION 这些SQL查询语法, 那么它们在SQLAlchemy中是如何表示的呢? 实际上和SQL语句一样, SQLAlchemy的语法也有select where join order_by group_by having 等 …

注意: 这些方法在Core与ORM中都适用

刚接触可能会觉得比较复杂, 但是假如有SQL基础的话, 用起来比较简单.
要查询的表结构为:

|  | # +++++++++++++++++++ 使用Core定义 +++++++++++++++++++ |
|  | from sqlalchemy import MetaData, create\_engine |
|  | from sqlalchemy import Table, Column, Integer, String, ForeignKey, Text |
|  |  |
|  | # 数据库配置 |
|  | DATABASE\_CONFIG = { |
|  | "username": "root", |
|  | "password": "123456", |
|  | "host": "localhost", |
|  | "database": "test" |
|  | } |
|  | # 连接mysql |
|  | engine = create\_engine("mysql+pymysql://{username}:{password}@{host}/{database}?charset=utf8". |
|  | format(**DATABASE\_CONFIG), echo=True, future=True) |
|  | metadata\_obj = MetaData() |
|  |  |
|  | user\_table = Table( |
|  | "user", |
|  |  metadata\_obj, |
|  |  Column("uid", Integer, primary\_key=True, autoincrement=True), |
|  |  Column("username", String(16), nullable=False), |
|  |  Column("password", String(32), nullable=False), |
|  | ) |
|  |  |
|  | article\_table = Table( |
|  | 'article', |
|  |  metadata\_obj, |
|  |  Column("aid", Integer, primary\_key=True, autoincrement=True), |
|  |  Column("title", String(36), nullable=False), |
|  |  Column("content", Text, nullable=False), |
|  |  Column("author\_id", Integer, ForeignKey("user.uid")) |
|  | ) |
|  |  |
|  | metadata\_obj.create\_all(bind=engine) |
|  |  |
|  |  |
|  | # +++++++++++++++++++ 使用ORM定义 +++++++++++++++++++ |
|  | from sqlalchemy import Column, Integer, create\_engine, String, Text, ForeignKey |
|  | from sqlalchemy.orm import declarative\_base, relationship |
|  |  |
|  | # 导入公共基类 |
|  | Base = declarative\_base() |
|  | # 数据库配置 |
|  | DATABASE\_CONFIG = { |
|  | "username": "root", |
|  | "password": "123456", |
|  | "host": "localhost", |
|  | "database": "test" |
|  | } |
|  | # 连接mysql |
|  | engine = create\_engine("mysql+pymysql://{username}:{password}@{host}/{database}". |
|  | format(**DATABASE\_CONFIG), echo=True, future=True) |
|  |  |
|  |  |
|  | class User(Base): |
|  |  \_\_tablename\_\_ = 'user' |
|  |  uid = Column(Integer, primary\_key=True, autoincrement=True) |
|  |  username = Column(String(16), nullable=False) |
|  |  password = Column(String(32), nullable=False) |
|  |  |
|  | # Article是类名 user是反向访问的属性名称 |
|  |  article = relationship("Article", backref="user") |
|  |  |
|  | def \_\_repr\_\_(self): |
|  | return "" % self.username |
|  |  |
|  |  |
|  | class Article(Base): |
|  |  \_\_tablename\_\_ = 'article' |
|  |  aid = Column(Integer, primary\_key=True, autoincrement=True) |
|  |  title = Column(String(36), nullable=False) |
|  |  content = Column(Text, nullable=False) |
|  |  author\_id = Column(Integer, ForeignKey("user.uid")) |
|  |  |
|  | def \_\_repr\_\_(self): |
|  | return "" % self.title |
|  |  |
|  |  |
|  | Base.metadata.create\_all(bind=engine) |
|  |  |

表数据为:

|  | mysql> select * from article, user where user.uid=article.author\_id; |
|  | +-----+------------+--------------------+-----------+-----+----------+----------+ |
|  | | aid | title | content | author\_id | uid | username | password | |
|  | +-----+------------+--------------------+-----------+-----+----------+----------+ |
|  | | 1 | C++入门 | c++ hello world | 1 | 1 | 张三 | 12345 | |
|  | | 2 | C入门 | c hello world | 1 | 1 | 张三 | 12345 | |
|  | | 3 | python入门 | print(hello world) | 2 | 2 | 李四 | 12346 | |
|  | +-----+------------+--------------------+-----------+-----+----------+----------+ |
|  | 3 rows in set (0.12 sec) |

select

该函数可以指定要查询的表、列及添加别名

|  | from sqlalchemy.orm import sessionmaker |
|  | from sqlalchemy import select |
|  |  |
|  |  |
|  | conn = engine.connect() # engine 在上面定义 |
|  |  |
|  | SessionLocal = sessionmaker(bind=engine) |
|  | session = SessionLocal() |
|  |  |
|  | # 1 ++++++++++++++++++ 一般查询 ++++++++++++++++++ |
|  | """ |
|  | 对应SQL |
|  | SELECT user.uid, user.username, user.password FROM user |
|  | WHERE user.uid = 2 |
|  | """ |
|  | # -------------- Core |
|  | result\_core\_1 = conn.execute(select(user\_table).where(user\_table.c.uid == 2)) |
|  | print(result\_core\_1.all()) # [(2, '李四', '12346')] |
|  | # -------------- ORM |
|  | result\_orm\_1 = session.execute(select(User).where(User.uid == 2)) |
|  | print(result\_orm\_1.all()) # [(,)] |
|  |  |
|  | # 2 ++++++++++++++++++ 查询全部列 ++++++++++++++++++ |
|  | """ |
|  | 对应SQL |
|  | SELECT user.uid, user.username, user.password  |
|  | FROM user |
|  | """ |
|  | # -------------- Core |
|  | result\_core\_2 = conn.execute(select(user\_table)) |
|  | print(result\_core\_2.all()) # [(1, '张三', '12345'), (2, '李四', '12346')] |
|  | # -------------- ORM |
|  | result\_orm\_2 = session.execute(select(User)) |
|  | print(result\_orm\_2.all()) # [(,), (,)] |
|  |  |
|  | # 3 ++++++++++++++++++ 查询指定的列 ++++++++++++++++++ |
|  | """ |
|  | 对应SQL |
|  | SELECT user.username  |
|  | FROM user |
|  | """ |
|  | # -------------- Core |
|  | result\_core\_3 = conn.execute(select(user\_table.c.username)) |
|  | print(result\_core\_3.all()) # [('张三',), ('李四',)] |
|  | # -------------- ORM |
|  | result\_orm\_3 = session.execute(select(User.username)) |
|  | print(result\_orm\_3.all()) # [('张三',), ('李四',)] |
|  |  |
|  | # 4 ++++++++++++++++++ 两个表 联合查询 ++++++++++++++++++ |
|  | """ |
|  | 对应SQL |
|  | SELECT user.username, article.title  |
|  | FROM user, article  |
|  | WHERE user.uid = article.author\_id |
|  | """ |
|  | # -------------- Core |
|  | result\_core\_4 = conn.execute( |
|  |  select(user\_table.c.username, article\_table.c.title).where( |
|  |  user\_table.c.uid == article\_table.c.author\_id)) |
|  | # [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')] |
|  | print(result\_core\_4.all()) |
|  | # -------------- ORM |
|  | result\_orm\_4 = session.execute(select(User.username, Article.title).where( |
|  |  User.uid == Article.author\_id)) |
|  | # [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')] |
|  | print(result\_orm\_4.all()) |
|  |  |
|  | # 5 ++++++++++++++++++ 为列添加别名 ++++++++++++++++++ |
|  | """ |
|  | 对应SQL |
|  | SELECT user.username AS name  |
|  | FROM user  |
|  | WHERE user.uid = 1 |
|  | """ |
|  | # -------------- Core |
|  | result\_core\_5 = conn.execute( |
|  |  select((user\_table.c.username).label("name")).where( |
|  |  user\_table.c.uid == 1)) |
|  | # print(result\_core\_5.all()) # [('张三',)] |
|  | # -------------- ORM |
|  | result\_orm\_5 = session.execute( |
|  |  select((User.username).label("name")).where( |
|  |  User.uid == 1)) |
|  | # print(result\_orm\_5.all()) # [('张三',)] |
|  |  |
|  | # 利用别名 取值 |
|  | print(result\_core\_5.first().name) # 张三 |
|  | print(result\_orm\_5.first().name) # 张三 |
|  |  |
|  | # 6 ++++++++++++++++++ 为表添加别名 ++++++++++++++++++ |
|  | """ |
|  | 对应SQL |
|  | SELECT user\_1.username  |
|  | FROM user AS user\_1  |
|  | WHERE user\_1.uid = 1 |
|  | """ |
|  | # -------------- Core |
|  | user\_table\_core\_alias = user\_table.alias() |
|  | result\_core\_6 = conn.execute( |
|  |  select(user\_table\_core\_alias.c.username).where( |
|  |  user\_table\_core\_alias.c.uid == 1)) |
|  | print(result\_core\_6.all()) # [('张三',)] |
|  | # -------------- ORM |
|  | from sqlalchemy.orm import aliased |
|  |  |
|  | user\_table\_orm\_alias = aliased(User) |
|  | result\_orm\_6 = session.execute( |
|  |  select(user\_table\_orm\_alias.username).where( |
|  |  user\_table\_orm\_alias.uid == 1)) |
|  | print(result\_orm\_6.all()) # [('张三',)] |
|  |  |
|  | # 7 ++++++++++++++++++ 与text 结合, 添加额外列 ++++++++++++++++++ |
|  | """ |
|  | 对应SQL |
|  | SELECT now() AS now, user.username, '自定义字符'  |
|  | FROM user |
|  | """ |
|  |  |
|  | from sqlalchemy import literal\_column, text |
|  |  |
|  | # literal\_column表示一列数据 |
|  | # text可以转化成SQL |
|  |  |
|  | # -------------- Core |
|  | result\_core\_7 = conn.execute( |
|  |  select(literal\_column("now()").label("now"), user\_table.c.username, text("'自定义字符'"))) |
|  | print(result\_core\_7.all()) |
|  | """ |
|  | [(datetime.datetime(2022, 1, 8, 18, 54, 44), '张三', '自定义字符'),  |
|  | (datetime.datetime(2022, 1, 8, 18, 54, 44), '李四', '自定义字符')] |
|  | """ |
|  | # -------------- ORM |
|  | result\_orm\_7 = session.execute( |
|  |  select(literal\_column("now()").label("now"), User.username, text("'自定义字符'"))) |
|  | print(result\_orm\_7.all()) |
|  | """ |
|  | [(datetime.datetime(2022, 1, 8, 18, 59, 42), '张三', '自定义字符'),  |
|  | (datetime.datetime(2022, 1, 8, 18, 59, 42), '李四', '自定义字符')] |
|  | """ |
|  |  |

where

过滤数据, Coretable.c.xxx获取行, 假如是ORM的话为:类名.属性名

|  | from sqlalchemy.orm import sessionmaker |
|  | from sqlalchemy import select |
|  |  |
|  |  |
|  | conn = engine.connect() # engine 在上面定义 |
|  |  |
|  | SessionLocal = sessionmaker(bind=engine) |
|  | session = SessionLocal() |
|  |  |
|  | # 1 ++++++++++++++++++ 条件默认为and ++++++++++++++++++ |
|  | """ |
|  | 对应SQL |
|  | SELECT user.uid, user.username, user.password  |
|  | FROM user  |
|  | WHERE user.uid < 3 AND user.uid > 1 |
|  | """ |
|  | # -------------- Core |
|  | result\_core\_1 = conn.execute( |
|  |  select(user\_table).where(user\_table.c.uid < 3, user\_table.c.uid > 1)) |
|  | print(result\_core\_1.all()) # [(2, '李四', '12346')] |
|  | # -------------- ORM |
|  | result\_orm\_1 = session.execute(select(User).where(User.uid < 3, User.uid > 1)) |
|  | print(result\_orm\_1.all()) # [(,)] |
|  |  |
|  | # 2 ++++++++++++++++++ 修改条件为not or ++++++++++++++++++ |
|  | """ |
|  | 对应SQL |
|  | SELECT user.uid, user.username, user.password  |
|  | FROM user  |
|  | WHERE (user.uid = 1 OR user.uid = 2) AND user.username != "李四" |
|  | """ |
|  | from sqlalchemy import or\_, not\_ |
|  |  |
|  | # -------------- Core |
|  | result\_core\_2 = conn.execute( |
|  |  select(user\_table).where( |
|  |  or\_(user\_table.c.uid == 1, user\_table.c.uid == 2), |
|  |  not\_(user\_table.c.username == "李四"), |
|  |  )) |
|  | print(result\_core\_2.all()) # [(1, '张三', '12345')] |
|  | # -------------- ORM |
|  | result\_orm\_2 = session.execute(select(User).where( |
|  |  or\_(User.uid == 1, User.uid == 2), |
|  |  not\_(User.username == "李四"))) |
|  | print(result\_orm\_2.all()) # [(,)] |
|  |  |
|  | # 3 ++++++++++++++++++ startswith ++++++++++++++++++ |
|  | """ |
|  | 对应SQL |
|  | SELECT user.uid, user.username, user.password  |
|  | FROM user  |
|  | WHERE (user.username LIKE concat('张', '%%')) |
|  | """ |
|  |  |
|  | # -------------- Core |
|  | result\_core\_3 = conn.execute( |
|  |  select(user\_table).where( |
|  |  user\_table.c.username.startswith("张"))) |
|  | print(result\_core\_3.all()) # [(1, '张三', '12345')] |
|  | # -------------- ORM |
|  | result\_orm\_3 = session.execute(select(User).where( |
|  |  User.username.startswith("张"))) |
|  | print(result\_orm\_3.all()) # [(,)] |
|  |  |
|  | # 4 ++++++++++++++++++ endswith ++++++++++++++++++ |
|  | """ |
|  | 对应SQL |
|  | SELECT user.uid, user.username, user.password  |
|  | FROM user  |
|  | WHERE (user.username LIKE concat('%%', '三')) |
|  | """ |
|  |  |
|  | # -------------- Core |
|  | result\_core\_4 = conn.execute( |
|  |  select(user\_table).where( |
|  |  user\_table.c.username.endswith("三"))) |
|  | print(result\_core\_4.all()) # [(1, '张三', '12345')] |
|  | # -------------- ORM |
|  | result\_orm\_4 = session.execute(select(User).where( |
|  |  User.username.endswith("三"))) |
|  | print(result\_orm\_4.all()) # [(,)] |
|  |  |
|  | # 5 ++++++++++++++++++ endswith ++++++++++++++++++ |
|  | """ |
|  | 对应SQL |
|  | SELECT user.uid, user.username, user.password  |
|  | FROM user  |
|  | WHERE (user.username LIKE concat(concat('%%', '三', '%%')) |
|  | """ |
|  |  |
|  | # -------------- Core |
|  | result\_core\_5 = conn.execute( |
|  |  select(user\_table).where( |
|  |  user\_table.c.username.contains("三"))) |
|  | print(result\_core\_5.all()) # [(1, '张三', '12345')] |
|  | # -------------- ORM |
|  | result\_orm\_5 = session.execute(select(User).where( |
|  |  User.username.contains("三"))) |
|  | print(result\_orm\_5.all()) # [(,)] |
|  |  |
|  | # 6 ++++++++++++++++++ like ++++++++++++++++++ |
|  | """ |
|  | 对应SQL |
|  | SELECT user.uid, user.username, user.password  |
|  | FROM user  |
|  | WHERE user.username LIKE '%三' |
|  | """ |
|  |  |
|  | # -------------- Core |
|  | result\_core\_6 = conn.execute( |
|  |  select(user\_table).where( |
|  |  user\_table.c.username.like("%三"))) |
|  | print(result\_core\_6.all()) # [(1, '张三', '12345')] |
|  | # -------------- ORM |
|  | result\_orm\_6 = session.execute(select(User).where( |
|  |  User.username.like("%三"))) |
|  | print(result\_orm\_6.all()) # [(,)] |
|  |  |
|  | # 7 ++++++++++++++++++ in ++++++++++++++++++ |
|  | """ |
|  | 对应SQL |
|  | SELECT user.uid, user.username, user.password  |
|  | FROM user  |
|  | WHERE user.uid IN (1, 2) |
|  | """ |
|  |  |
|  | # -------------- Core |
|  | result\_core\_7 = conn.execute( |
|  |  select(user\_table).where( |
|  |  user\_table.c.uid.in\_((1, 2)) |
|  |  )) |
|  | print(result\_core\_7.all()) # [(1, '张三', '12345'), (2, '李四', '12346')] |
|  | # -------------- ORM |
|  | result\_orm\_7 = session.execute( |
|  |  select(User).where( |
|  |  User.uid.in\_((1, 2)) |
|  |  )) |
|  | print(result\_orm\_7.all()) # [(,), (,)] |
|  |  |
|  | # 8 ++++++++++++++++++ between ... and ... ++++++++++++++++++ |
|  | """ |
|  | 对应SQL |
|  | SELECT user.uid, user.username, user.password  |
|  | FROM user  |
|  | WHERE user.uid BETWEEN 1 AND 2 |
|  | """ |
|  |  |
|  | # -------------- Core |
|  | result\_core\_8 = conn.execute( |
|  |  select(user\_table).where( |
|  |  user\_table.c.uid.between(1, 2) |
|  |  )) |
|  | print(result\_core\_8.all()) # [(1, '张三', '12345'), (2, '李四', '12346')] |
|  | # -------------- ORM |
|  | result\_orm\_8 = session.execute( |
|  |  select(User).where( |
|  |  User.uid.between(1, 2) |
|  |  )) |
|  | print(result\_orm\_8.all()) # [(,), (,)] |
|  |  |
|  | # 9 ++++++++++++++++++ is null 或 is not null ++++++++++++++++++ |
|  | """ |
|  | 对应SQL |
|  | SELECT user.uid, user.username, user.password  |
|  | FROM user  |
|  | WHERE user.uid IS NOT NULL |
|  | """ |
|  |  |
|  | from sqlalchemy import not\_ |
|  |  |
|  | # -------------- Core |
|  | result\_core\_9 = conn.execute( |
|  |  select(user\_table).where( |
|  | # user\_table.c.uid.is\_(None), # IS NULL |
|  |  not\_(user\_table.c.uid.is\_(None)), # IS NOT NULL |
|  |  )) |
|  | print(result\_core\_9.all()) # [(1, '张三', '12345'), (2, '李四', '12346')] |
|  | # -------------- ORM |
|  | result\_orm\_9 = session.execute( |
|  |  select(User).where( |
|  | # User.uid.is\_(None), # IS NULL |
|  |  not\_(User.uid.is\_(None)), # IS NOT NULL |
|  |  )) |
|  | print(result\_orm\_9.all()) # [(,), (,)] |
|  |  |

除此之外, 你还可以使用一些其他运算符, 详情见: Operator Reference

order_by

order_by用于排序

|  | from sqlalchemy.orm import sessionmaker |
|  | from sqlalchemy import select |
|  |  |
|  |  |
|  | conn = engine.connect() # engine 在上面定义 |
|  |  |
|  | SessionLocal = sessionmaker(bind=engine) |
|  | session = SessionLocal() |
|  |  |
|  | # 1 ++++++++++++++++++ 根据某一列排序 (默认升序) ++++++++++++++++++ |
|  | """ |
|  | 对应SQL |
|  | SELECT user.uid, user.username, user.password  |
|  | FROM user ORDER BY user.uid |
|  | """ |
|  |  |
|  | # -------------- Core |
|  | result\_core\_1 = conn.execute( |
|  |  select(user\_table).order\_by(user\_table.c.uid)) |
|  | print(result\_core\_1.all()) # [(1, '张三', '12345'), (2, '李四', '12346')] |
|  | # -------------- ORM |
|  | result\_orm\_1 = session.execute( |
|  |  select(User).order\_by(User.uid)) |
|  | print(result\_orm\_1.all()) # [(,), (,)] |
|  |  |
|  | # 2 ++++++++++++++++++ 手动指定升序/降序 ++++++++++++++++++ |
|  | """ |
|  | 对应SQL |
|  | SELECT user.uid, user.username, user.password  |
|  | FROM user ORDER BY user.uid ASC/DESC |
|  | """ |
|  |  |
|  | # -------------- Core |
|  | result\_core\_2 = conn.execute( |
|  | # select(user\_table).order\_by(user\_table.c.uid.asc()) # 升序 |
|  |  select(user\_table).order\_by(user\_table.c.uid.desc()) # 降序 |
|  | ) |
|  | print(result\_core\_2.all()) |
|  | # [(1, '张三', '12345'), (2, '李四', '12346')] / [(2, '李四', '12346'), (1, '张三', '12345')] |
|  | # -------------- ORM |
|  | result\_orm\_2 = session.execute( |
|  | # select(User).order\_by(User.uid.asc()) # 升序 |
|  |  select(User).order\_by(User.uid.desc()) # 降序 |
|  | ) |
|  | print(result\_orm\_2.all()) |
|  | # [(,), (,)] / [(,), (,)] |
|  |  |
|  | # 3 ++++++++++++++++++ 根据别名排序 ++++++++++++++++++ |
|  | """ |
|  | 对应SQL |
|  | SELECT user.username AS name  |
|  | FROM user ORDER BY name DESC |
|  | """ |
|  | from sqlalchemy import desc, asc |
|  |  |
|  | # -------------- Core |
|  | result\_core\_3 = conn.execute( |
|  |  select((user\_table.c.username).label("name")).order\_by(desc("name")) |
|  | ) |
|  | print(result\_core\_3.all()) # [('张三',), ('李四',)] |
|  |  |
|  | # -------------- ORM |
|  | result\_orm\_3 = session.execute( |
|  |  select((User.username).label("name")).order\_by(desc("name")) |
|  | ) |
|  | print(result\_orm\_3.all()) # [('张三',), ('李四',)] |
|  |  |

group_by和having

group_by用于分组, having类似于where, 但可以对已分组数据使用聚合函数

|  | from sqlalchemy.orm import sessionmaker |
|  | from sqlalchemy import select, func |
|  |  |
|  |  |
|  | conn = engine.connect() # engine 在上面定义 |
|  |  |
|  | SessionLocal = sessionmaker(bind=engine) |
|  | session = SessionLocal() |
|  |  |
|  | # 1 ++++++++++++++++++ group\_by一般使用 ++++++++++++++++++ |
|  | """ |
|  | 对应SQL |
|  | SELECT count(article.author\_id) AS count  |
|  | FROM article GROUP BY article.author\_id |
|  | """ |
|  |  |
|  | # func 内有很多内置函数 |
|  |  |
|  | # -------------- Core |
|  | result\_core\_1 = conn.execute( |
|  |  select(func.count(article\_table.c.author\_id).label("count")).group\_by(article\_table.c.author\_id) |
|  | ) |
|  | print(result\_core\_1.all()) # [(2,), (1,)] |
|  |  |
|  | # -------------- ORM |
|  | result\_orm\_1 = session.execute( |
|  |  select(func.count(Article.author\_id).label("count")).group\_by(Article.author\_id) |
|  | ) |
|  | print(result\_orm\_1.all()) # [(2,), (1,)] |
|  |  |
|  | # 2 ++++++++++++++++++ group by + having ++++++++++++++++++ |
|  | """ |
|  | 对应SQL |
|  | SELECT count(article.author\_id) AS count  |
|  | FROM article GROUP BY article.author\_id  |
|  | HAVING count(article.author\_id) > 1 |
|  | """ |
|  |  |
|  | # func 内有很多内置函数 |
|  |  |
|  | # -------------- Core |
|  | result\_core\_2 = conn.execute( |
|  |  select(func.count(article\_table.c.author\_id).label("count")).group\_by( |
|  |  article\_table.c.author\_id).having(func.count(article\_table.c.author\_id) > 1) |
|  | ) |
|  | print(result\_core\_2.all()) # [(2,)] |
|  |  |
|  | # -------------- ORM |
|  | result\_orm\_2 = session.execute( |
|  |  select(func.count(Article.author\_id).label("count")).group\_by( |
|  |  Article.author\_id).having(func.count(Article.author\_id) > 1) |
|  | ) |
|  | print(result\_orm\_2.all()) # [(2,)] |
|  |  |

除了count外, 还有其他的方法, 详情见: 内置函数

limit和offset

limit: 表示取几条数据, offset: 表示要跳过多少条数据

|  | from sqlalchemy.orm import sessionmaker |
|  | from sqlalchemy import select |
|  |  |
|  |  |
|  | conn = engine.connect() # engine 在上面定义 |
|  |  |
|  | SessionLocal = sessionmaker(bind=engine) |
|  | session = SessionLocal() |
|  |  |
|  | # 1 ++++++++++++++++++ 仅使用LIMIT ++++++++++++++++++ |
|  | """ |
|  | 对应SQL |
|  | SELECT user.uid, user.username, user.password  |
|  | FROM user  |
|  | LIMIT 1 |
|  | """ |
|  |  |
|  | # -------------- Core |
|  | result\_core\_1 = conn.execute(select(user\_table).limit(1)) |
|  | print(result\_core\_1.all()) # [(1, '张三', '12345')] |
|  |  |
|  | # -------------- ORM |
|  | result\_orm\_1 = session.execute(select(User).limit(1)) |
|  | print(result\_orm\_1.all()) # [(,)] |
|  |  |
|  | # 2 ++++++++++++++++++ 使用LIMIT和OFFSET ++++++++++++++++++ |
|  | """ |
|  | 对应SQL |
|  | SELECT user.uid, user.username, user.password  |
|  | FROM user  |
|  | LIMIT 1, 1 |
|  | """ |
|  |  |
|  | # -------------- Core |
|  | result\_core\_2 = conn.execute(select(user\_table).limit(1).offset(1)) |
|  | print(result\_core\_2.all()) # [(2, '李四', '12346')] |
|  |  |
|  | # -------------- ORM |
|  | result\_orm\_2 = session.execute(select(User).limit(1).offset(1)) |
|  | print(result\_orm\_2.all()) # [(,)] |
|  |  |

去重

你可以在查询的时候使用SQL进行去重, 亦可以在取到数据后进行去重

|  | from sqlalchemy.orm import sessionmaker |
|  | from sqlalchemy import select |
|  |  |
|  | conn = engine.connect() # engine 在上面定义 |
|  |  |
|  | SessionLocal = sessionmaker(bind=engine) |
|  | session = SessionLocal() |
|  | # 1 ++++++++++++++++++ 在SQL中去重 ++++++++++++++++++ |
|  | """ |
|  | 对应SQL |
|  | SELECT DISTINCT article.author\_id  |
|  | FROM article |
|  | """ |
|  |  |
|  | # -------------- Core |
|  | result\_core\_1 = conn.execute( |
|  |  select(article\_table.c.author\_id).distinct() |
|  | ) |
|  | print(result\_core\_1.all()) # [(1,), (2,)] |
|  | # -------------- ORM |
|  | result\_orm\_1 = session.execute( |
|  |  select(Article.author\_id).distinct()) |
|  | print(result\_orm\_1.all()) # [(1,), (2,)] |
|  |  |
|  | # 2 ++++++++++++++++++ 在结果中去重 ++++++++++++++++++ |
|  | """ |
|  | 对应SQL |
|  | SELECT article.author\_id  |
|  | FROM article |
|  | """ |
|  |  |
|  | # -------------- Core |
|  | result\_core\_2 = conn.execute(select(article\_table.c.author\_id)) |
|  | print(result\_core\_2.unique().all()) # [(1,), (2,)] |
|  |  |
|  | # -------------- ORM |
|  | result\_orm\_2 = session.execute(select(Article.author\_id)) |
|  | print(result\_orm\_2.unique().all()) # [(1,), (2,)] |
|  |  |

连接查询

内连接查询 外连接查询 完全连接查询三种连接查询方式, 在SQLAlchemy中分别对应着join(...) join(..., isouter=True) join(..., full=True)

|  | from sqlalchemy.orm import sessionmaker |
|  | from sqlalchemy import select |
|  |  |
|  |  |
|  | conn = engine.connect() # engine 在上面定义 |
|  |  |
|  | SessionLocal = sessionmaker(bind=engine) |
|  | session = SessionLocal() |
|  |  |
|  | # 1 ++++++++++++++++++ 内连接查询 (自动推断join的表) 方式一 ++++++++++++++++++ |
|  | """ |
|  | 对应SQL |
|  | SELECT user.username, article.title  |
|  | FROM user INNER JOIN article ON user.uid = article.author\_id |
|  | """ |
|  |  |
|  | # -------------- Core |
|  | result\_core\_1 = conn.execute( |
|  |  select(user\_table.c.username, article\_table.c.title).join\_from(user\_table, article\_table)) |
|  | print(result\_core\_1.all()) # [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')] |
|  | # -------------- ORM |
|  | result\_orm\_1 = session.execute( |
|  |  select(User.username, Article.title).join\_from(User, Article)) |
|  | print(result\_orm\_1.all()) # [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')] |
|  |  |
|  | # 2 ++++++++++++++++++ 内连接查询 (自动推断join的表) 方式二 ++++++++++++++++++ |
|  | """ |
|  | 对应SQL |
|  | SELECT user.username, article.title  |
|  | FROM user INNER JOIN article ON user.uid = article.author\_id |
|  | """ |
|  |  |
|  | # -------------- Core |
|  | result\_core\_2 = conn.execute( |
|  |  select(user\_table.c.username, article\_table.c.title).join(article\_table)) |
|  | print(result\_core\_2.all()) # [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')] |
|  | # -------------- ORM |
|  | result\_orm\_2 = session.execute(select(User.username, Article.title).join(Article)) |
|  | print(result\_orm\_2.all()) # [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')] |
|  |  |
|  | # 3 ++++++++++++++++++ 内连接查询 (手动指定join的表) ++++++++++++++++++ |
|  | """ |
|  | 对应SQL |
|  | SELECT user.username, article.title  |
|  | FROM user INNER JOIN article ON user.uid = article.author\_id |
|  | """ |
|  |  |
|  | # -------------- Core |
|  | result\_core\_3 = conn.execute( |
|  |  select(user\_table.c.username, article\_table.c.title).select\_from(user\_table).join(article\_table)) |
|  | print(result\_core\_3.all()) # [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')] |
|  | # -------------- ORM |
|  | result\_orm\_3 = session.execute( |
|  |  select(User.username, Article.title).select\_from(User).join(Article)) |
|  | print(result\_orm\_3.all()) # [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')] |
|  |  |
|  | # 4 ++++++++++++++++++ 内连接查询 (手动指定on的条件) ++++++++++++++++++ |
|  | """ |
|  | 对应SQL |
|  | SELECT user.username, article.title  |
|  | FROM user INNER JOIN article ON user.uid = article.author\_id |
|  | """ |
|  |  |
|  | # -------------- Core |
|  | result\_core\_4 = conn.execute( |
|  |  select(user\_table.c.username, article\_table.c.title).select\_from( |
|  |  user\_table).join(article\_table, user\_table.c.uid == article\_table.c.author\_id)) |
|  | print(result\_core\_4.all()) # [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')] |
|  | # -------------- ORM |
|  | result\_orm\_4 = session.execute( |
|  |  select(User.username, Article.title).select\_from( |
|  |  User).join(Article, User.uid == Article.author\_id)) |
|  | print(result\_orm\_4.all()) # [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')] |
|  |  |
|  | # 5 ++++++++++++++++++ 外连接查询 ++++++++++++++++++ |
|  | """ |
|  | 对应SQL |
|  | SELECT user.uid, user.username, user.password, article.title  |
|  | FROM user LEFT OUTER JOIN article ON user.uid = article.author\_id |
|  | """ |
|  |  |
|  | # -------------- Core |
|  | result\_core\_5 = conn.execute( |
|  |  select(user\_table, article\_table.c.title).join(article\_table, isouter=True)) |
|  | print(result\_core\_5.all()) |
|  | # [(1, '张三', '12345', 'C入门'), (1, '张三', '12345', 'C++入门'), (2, '李四', '12346', 'python入门')] |
|  | # -------------- ORM |
|  | result\_orm\_5 = session.execute( |
|  |  select(User, Article.title).join(Article, isouter=True)) |
|  | print(result\_orm\_5.all()) # [(, 'C入门'), (, 'C++入门'), (, 'python入门')] |
|  |  |
|  | # 6 ++++++++++++++++++ 完全连接查询 ++++++++++++++++++ |
|  | # # !!! 注意: MYSQL 中没有 FULL OUTER JOIN, 执行时会报错 |
|  | """ |
|  | 对应SQL |
|  | SELECT user.uid, user.username, user.password, article.title  |
|  | FROM user LEFT OUTER JOIN article ON user.uid = article.author\_id |
|  | """ |
|  |  |
|  | # -------------- Core |
|  | result\_core\_6 = conn.execute( |
|  |  select(user\_table, article\_table.c.title).join(article\_table, full=True)) |
|  | print(result\_core\_6.all()) |
|  |  |
|  | # -------------- ORM |
|  | result\_orm\_6 = session.execute( |
|  |  select(User, Article.title).join(Article, full=True)) |
|  | print(result\_orm\_6.all()) |
|  |  |

注意: 使用ORM的方式进行连表操作时, 可以通过relationship, 即不需要直接指定第二张表, 指定relationship

比如:

|  | from sqlalchemy.orm import sessionmaker |
|  | from sqlalchemy import select |
|  |  |
|  |  |
|  | conn = engine.connect() # engine 在上面定义 |
|  |  |
|  | SessionLocal = sessionmaker(bind=engine) |
|  | session = SessionLocal() |
|  |  |
|  | """ |
|  | 对应SQL |
|  | SELECT user.username, article.title  |
|  | FROM user INNER JOIN article ON user.uid = article.author\_id |
|  | """ |
|  |  |
|  | result\_orm = session.execute( |
|  |  select(User.username, Article.title).select\_from(User).join(User.article)) |
|  | print(result\_orm.all()) # [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')] |
|  |  |

UNION和UNION ALL

UNION ALL: 合并两个或多个SELECT语句的结果, 结果不会去重
UNION: 合并两个或多个SELECT语句的结果, 结果会去重

|  | from sqlalchemy.orm import sessionmaker |
|  | from sqlalchemy import select, union\_all, union |
|  |  |
|  |  |
|  | conn = engine.connect() # engine 在上面定义 |
|  |  |
|  | SessionLocal = sessionmaker(bind=engine) |
|  | session = SessionLocal() |
|  |  |
|  | # 1 ++++++++++++++++++ UNION ALL ++++++++++++++++++ |
|  | """ |
|  | 对应SQL |
|  | SELECT user.uid, user.username, user.password  |
|  | FROM user  |
|  | WHERE user.uid = 1  |
|  | UNION ALL |
|  | SELECT user.uid, user.username, user.password  |
|  | FROM user  |
|  | WHERE user.uid = 2 |
|  | """ |
|  |  |
|  | # -------------- Core |
|  | result\_core\_1\_stmt1 = select(user\_table).where(user\_table.c.uid == 1) |
|  | result\_core\_1\_stmt2 = select(user\_table).where(user\_table.c.uid == 2) |
|  | result\_core\_1\_u = union\_all(result\_core\_1\_stmt1, result\_core\_1\_stmt2) |
|  |  |
|  | result\_core\_1 = conn.execute(result\_core\_1\_u) |
|  | print(result\_core\_1.all()) # [(1, '张三', '12345'), (2, '李四', '12346')] |
|  |  |
|  | # -------------- ORM |
|  | result\_orm\_1\_stmt1 = select(user\_table).where(user\_table.c.uid == 1) |
|  | result\_orm\_1\_stmt2 = select(user\_table).where(user\_table.c.uid == 2) |
|  | result\_orm\_1\_u = union\_all(result\_orm\_1\_stmt1, result\_orm\_1\_stmt2) |
|  |  |
|  | result\_orm\_1 = session.execute(result\_orm\_1\_u) |
|  | print(result\_orm\_1.all()) # [(1, '张三', '12345'), (2, '李四', '12346')] |
|  |  |
|  | # 2 ++++++++++++++++++ UNION ++++++++++++++++++ |
|  | """ |
|  | 对应SQL |
|  | SELECT user.uid, user.username, user.password  |
|  | FROM user  |
|  | WHERE user.uid = 1  |
|  | UNION |
|  | SELECT user.uid, user.username, user.password  |
|  | FROM user  |
|  | WHERE user.uid = 1 |
|  | """ |
|  |  |
|  | # -------------- Core |
|  | result\_core\_2\_stmt1 = select(user\_table).where(user\_table.c.uid == 1) |
|  | result\_core\_2\_stmt2 = select(user\_table).where(user\_table.c.uid == 1) |
|  | result\_core\_2\_u = union(result\_core\_2\_stmt1, result\_core\_2\_stmt2) |
|  |  |
|  | result\_core\_2 = conn.execute(result\_core\_2\_u) |
|  | print(result\_core\_2.all()) # [(1, '张三', '12345')] |
|  |  |
|  | # -------------- ORM |
|  | result\_orm\_2\_stmt1 = select(user\_table).where(user\_table.c.uid == 1) |
|  | result\_orm\_2\_stmt2 = select(user\_table).where(user\_table.c.uid == 1) |
|  | result\_orm\_2\_u = union(result\_orm\_2\_stmt1, result\_orm\_2\_stmt2) |
|  |  |
|  | result\_orm\_2 = session.execute(result\_orm\_2\_u) |
|  | print(result\_orm\_2.all()) # [(1, '张三', '12345')] |
|  |  |

子查询

即形如: SELECT * FROM data WHERE name IN (SELECT name FROM user);
SELECT * FROM data WHERE EXISTS (SELECT name FROM user);的查询
或使用WITH temp AS (...) 作为临时表
注意: 在使用子查询后,SQL语句的查询性能变得非常糟糕, 至于如何取舍看个人权衡了

关于子查询的优化, 见: 深入理解MySql子查询IN的执行和优化

|  | from sqlalchemy.orm import sessionmaker |
|  | from sqlalchemy import select, func |
|  |  |
|  |  |
|  | conn = engine.connect() # engine 在上面定义 |
|  |  |
|  | SessionLocal = sessionmaker(bind=engine) |
|  | session = SessionLocal() |
|  |  |
|  | # 1 ++++++++++++++++++ EXISTS 子查询 ++++++++++++++++++ |
|  | """ |
|  | 对应SQL |
|  | SELECT user.username  |
|  | FROM user  |
|  | WHERE EXISTS (SELECT count(article.author\_id) AS count\_1  |
|  | FROM article GROUP BY article.author\_id  |
|  | HAVING count(article.author\_id) >= 2) |
|  | """ |
|  |  |
|  | # -------------- Core |
|  | subquery\_core\_1 = (select(func.count(article\_table.c.author\_id)).group\_by( |
|  |  article\_table.c.author\_id).having( |
|  |  func.count(article\_table.c.author\_id) >= 2)).exists() |
|  |  |
|  | result\_core\_1 = conn.execute(select(user\_table.c.username).where(subquery\_core\_1)) |
|  | print(result\_core\_1.all()) # [('张三',), ('李四',)] |
|  |  |
|  | # -------------- ORM |
|  | subquery\_orm\_1 = (select(func.count(Article.author\_id)).group\_by( |
|  |  Article.author\_id).having( |
|  |  func.count(Article.author\_id) >= 2)).exists() |
|  |  |
|  | result\_orm\_1 = session.execute(select(User.username).where(subquery\_orm\_1)) |
|  | print(result\_orm\_1.all()) # [('张三',), ('李四',)] |
|  |  |
|  | # 2 ++++++++++++++++++ 其它 子查询 ++++++++++++++++++ |
|  | """ |
|  | 对应SQL |
|  | SELECT article.title, anon\_1.username  |
|  | FROM article, (SELECT user.uid AS uid, user.username AS username  |
|  | FROM user  |
|  | WHERE user.uid = 1) AS anon\_1  |
|  | WHERE article.author\_id = anon\_1.uid |
|  | """ |
|  |  |
|  | # -------------- Core |
|  | subquery\_core\_2 = select(user\_table.c.uid, user\_table.c.username).where( |
|  |  user\_table.c.uid == 1).subquery() |
|  |  |
|  | result\_core\_2 = conn.execute( |
|  |  select(article\_table.c.title, subquery\_core\_2.c.username).where( |
|  |  article\_table.c.author\_id == subquery\_core\_2.c.uid)) |
|  | print(result\_core\_2.all()) # [('C++入门', '张三'), ('C入门', '张三')] |
|  |  |
|  | # # -------------- ORM |
|  | subquery\_orm\_2 = select(User.uid, User.username).where(User.uid == 1).subquery() |
|  |  |
|  | result\_orm\_2 = session.execute( |
|  |  select(Article.title, subquery\_orm\_2.c.username).where( |
|  |  Article.author\_id == subquery\_orm\_2.c.uid)) |
|  | print(result\_orm\_2.all()) # [('C++入门', '张三'), ('C入门', '张三')] |
|  |  |
|  | # 3 ++++++++++++++++++ with 添加临时表 ++++++++++++++++++ |
|  | """ |
|  | 对应SQL |
|  | WITH anon\_1 AS  |
|  | (SELECT user.uid AS uid, user.username AS username  |
|  | FROM user  |
|  | WHERE user.uid = 1) |
|  |  SELECT article.title, anon\_1.username  |
|  | FROM article, anon\_1  |
|  | WHERE article.author\_id = anon\_1.uid |
|  | """ |
|  |  |
|  | # -------------- Core |
|  | subquery\_core\_3 = select(user\_table.c.uid, user\_table.c.username).where( |
|  |  user\_table.c.uid == 1).cte() |
|  |  |
|  | result\_core\_3 = conn.execute( |
|  |  select(article\_table.c.title, subquery\_core\_3.c.username).where( |
|  |  article\_table.c.author\_id == subquery\_core\_3.c.uid)) |
|  | print(result\_core\_3.all()) # [('C++入门', '张三'), ('C入门', '张三')] |
|  |  |
|  | # -------------- ORM |
|  | subquery\_orm\_3 = select(User.uid, User.username).where(User.uid == 1).cte() |
|  |  |
|  | result\_orm\_3 = session.execute( |
|  |  select(Article.title, subquery\_orm\_3.c.username).where( |
|  |  Article.author\_id == subquery\_orm\_3.c.uid)) |
|  | print(result\_orm\_3.all()) # [('C++入门', '张三'), ('C入门', '张三')] |
|  |  |

从1.x迁移到2.0的接口

一些Query的接口用起来还是特别好用的, 因此在2.0风格的接口中, 一些接口仍然可以使用

get根据主键查询

注意: 这是session的方法

|  | # ---------------- 1.x |
|  | session.query(User).get(42) |
|  | # ---------------- 2.0 |
|  | session.get(User, 42) |

filter_by简单查询

注意: 这是select的方法

|  | result = session.execute( |
|  |  select(User).filter\_by(username="张三") |
|  | ) |
|  | print(result.all()) # [(,)] |
|  |  |

filter复杂查询

注意: 这是select的方法

|  | result = session.execute( |
|  |  select(User).filter(User.username == "张三") |
|  | ) |
|  | print(result.all()) # [(,)] |
|  |  |

1.x与2.0的ORM接口对比

以下表格来自于: 2.0 Migration - ORM Usage

一些类的介绍

Result

表示从数据库中返回的结果, 一行数据使用Row对象表示, 关于Row, 见: Row

从SQLAlchemy1.4开始, Core和ORM的结果(Result), 使用的接口相同

|  | from sqlalchemy import create\_engine, text |
|  | from sqlalchemy.orm import declarative\_base, sessionmaker |
|  |  |
|  | # 导入公共基类 |
|  | Base = declarative\_base() |
|  | # 数据库配置 |
|  | DATABASE\_CONFIG = { |
|  | "username": "root", |
|  | "password": "123456", |
|  | "host": "localhost", |
|  | "database": "test" |
|  | } |
|  | # 连接mysql |
|  | engine = create\_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE\_CONFIG), |
|  |  echo=True, future=True) |
|  | Session = sessionmaker(bind=engine) |
|  |  |
|  | with Session() as session: |
|  |  session\_res = session.execute( |
|  |  text("select tid, name from teacher;") |
|  |  ) |
|  | #  |
|  |  print(type(session\_res)) |
|  |  |
|  | with engine.connect() as conn: |
|  |  conn\_res = conn.execute( |
|  |  text("select tid, name from teacher;") |
|  |  ) |
|  | #  |
|  |  print(type(conn\_res)) |
|  |  |

注意: 例子中的teacher表的数据为:

|  | +-----+----------+ |
|  | | tid | name | |
|  | +-----+----------+ |
|  | | 1 | 语文老师 | |
|  | | 2 | 英语老师 | |
|  | | 3 | 数学老师 | |
|  | +-----+----------+ |

Result的全部方法

  • unique(strategy=None)
    去重, 但需要注意何时调用, 应该在调用如.all()这种生成Row的方法之前调用, 否则返回的对象都没有unique这个方法
|  | from sqlalchemy import create\_engine, text |
|  | from sqlalchemy.orm import declarative\_base, sessionmaker |
|  |  |
|  | # 导入公共基类 |
|  | Base = declarative\_base() |
|  | # 数据库配置 |
|  | DATABASE\_CONFIG = { |
|  | "username": "root", |
|  | "password": "123456", |
|  | "host": "localhost", |
|  | "database": "test" |
|  | } |
|  | # 连接mysql |
|  | engine = create\_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE\_CONFIG), |
|  |  echo=True, future=True) |
|  | Session = sessionmaker(bind=engine) |
|  |  |
|  | with Session() as session: |
|  |  session\_res1 = session.execute( |
|  |  text("select tid, name from teacher;") |
|  |  ) |
|  |  session\_res2 = session.execute( |
|  |  text("select tid, name from teacher;") |
|  |  ) |
|  |  |
|  |  session\_res = session\_res1.merge(session\_res2) |
|  |  |
|  | # [1, 2, 3, 1, 2, 3] ==> [1, 2, 3] |
|  |  print(session\_res.scalars().unique().all()) |
|  |  |
  • all
    返回所有Row数据的列表, 之后的调用将返回一个空列表
|  | from sqlalchemy import create\_engine, text |
|  | from sqlalchemy.orm import declarative\_base, sessionmaker |
|  |  |
|  | # 导入公共基类 |
|  | Base = declarative\_base() |
|  | # 数据库配置 |
|  | DATABASE\_CONFIG = { |
|  | "username": "root", |
|  | "password": "123456", |
|  | "host": "localhost", |
|  | "database": "test" |
|  | } |
|  | # 连接mysql |
|  | engine = create\_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE\_CONFIG), |
|  |  echo=True, future=True) |
|  | Session = sessionmaker(bind=engine) |
|  |  |
|  | with Session() as session: |
|  |  session\_res = session.execute( |
|  |  text("select tid, name from teacher;") |
|  |  ) |
|  |  |
|  | # ---------------------------------- 第一次调用all |
|  |  first\_all = session\_res.all() |
|  |  print(type(session\_res.all())) #  |
|  | for row in first\_all: |
|  |  print(type(row)) #  |
|  |  print(f"{row.tid}-{row.name}") |
|  |  |
|  | # ---------------------------------- 第二次调用all |
|  |  print(session\_res.all()) # [] |
|  |  |
  • fetchall()
    all方法一样
|  | from sqlalchemy import create\_engine, text |
|  | from sqlalchemy.orm import declarative\_base, sessionmaker |
|  |  |
|  | # 导入公共基类 |
|  | Base = declarative\_base() |
|  | # 数据库配置 |
|  | DATABASE\_CONFIG = { |
|  | "username": "root", |
|  | "password": "123456", |
|  | "host": "localhost", |
|  | "database": "test" |
|  | } |
|  | # 连接mysql |
|  | engine = create\_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE\_CONFIG), |
|  |  echo=True, future=True) |
|  | Session = sessionmaker(bind=engine) |
|  |  |
|  | with Session() as session: |
|  |  session\_res = session.execute( |
|  |  text("select tid, name from teacher;") |
|  |  ) |
|  |  first\_fetchall = session\_res.fetchall() |
|  |  second\_fetchall = session\_res.fetchall() |
|  | # [(1, '语文老师'), (2, '英语老师'), (3, '数学老师')] |
|  |  print(first\_fetchall) |
|  | # [] |
|  |  print(second\_fetchall) |
|  |  |
  • fetchmany(size=None)
    取多行数据, size表示取多少行数据 , 当所有行都用完时, 返回一个空列表
|  | from sqlalchemy import create\_engine, text |
|  | from sqlalchemy.orm import declarative\_base, sessionmaker |
|  |  |
|  | # 导入公共基类 |
|  | Base = declarative\_base() |
|  | # 数据库配置 |
|  | DATABASE\_CONFIG = { |
|  | "username": "root", |
|  | "password": "123456", |
|  | "host": "localhost", |
|  | "database": "test" |
|  | } |
|  | # 连接mysql |
|  | engine = create\_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE\_CONFIG), |
|  |  echo=True, future=True) |
|  | Session = sessionmaker(bind=engine) |
|  |  |
|  | with Session() as session: |
|  |  session\_res = session.execute( |
|  |  text("select tid, name from teacher;") |
|  |  ) |
|  | # 一共3行数据 |
|  |  |
|  | # 取两行数据: [(1, '语文老师'), (2, '英语老师')] |
|  |  print(session\_res.fetchmany(2)) |
|  |  |
|  | # 取一行数据: [(3, '数学老师')] |
|  |  print(session\_res.fetchmany(1)) |
|  |  |
|  | # 没有数据了, 返回空列表: [] |
|  |  print(session\_res.fetchmany(1)) |
  • fetchone()
    取一行数据, 当所有行都用完时,返回None
|  | from sqlalchemy import create\_engine, text |
|  | from sqlalchemy.orm import declarative\_base, sessionmaker |
|  |  |
|  | # 导入公共基类 |
|  | Base = declarative\_base() |
|  | # 数据库配置 |
|  | DATABASE\_CONFIG = { |
|  | "username": "root", |
|  | "password": "123456", |
|  | "host": "localhost", |
|  | "database": "test" |
|  | } |
|  | # 连接mysql |
|  | engine = create\_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE\_CONFIG), |
|  |  echo=True, future=True) |
|  | Session = sessionmaker(bind=engine) |
|  |  |
|  | with Session() as session: |
|  |  session\_res = session.execute( |
|  |  text("select tid, name from teacher;") |
|  |  ) |
|  | while True: |
|  |  row = session\_res.fetchone() |
|  | if not row: |
|  | break |
|  |  print(row) |
|  | """ |
|  |  (1, '语文老师') |
|  |  (2, '英语老师') |
|  |  (3, '数学老师') |
|  |  """ |
|  |  |
  • first()
    获取第一行数据,关闭Result并丢弃其余行, 如果没有行,则不获取 (即返回值为None)
|  | from sqlalchemy import create\_engine, text |
|  | from sqlalchemy.orm import declarative\_base, sessionmaker |
|  |  |
|  | # 导入公共基类 |
|  | Base = declarative\_base() |
|  | # 数据库配置 |
|  | DATABASE\_CONFIG = { |
|  | "username": "root", |
|  | "password": "123456", |
|  | "host": "localhost", |
|  | "database": "test" |
|  | } |
|  | # 连接mysql |
|  | engine = create\_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE\_CONFIG), |
|  |  echo=True, future=True) |
|  | Session = sessionmaker(bind=engine) |
|  |  |
|  | with Session() as session: |
|  |  session\_res = session.execute( |
|  |  text("select tid, name from teacher;") |
|  |  ) |
|  |  |
|  | # (1, '语文老师') |
|  |  print(session\_res.first()) |
|  |  |
|  | # !!! 由于Result已经关闭, 继续操作会报错: |
|  | # sqlalchemy.exc.ResourceClosedError: This result object is closed. |
|  |  print(session\_res.first()) |
|  |  |
  • one()
    只返回一行数据或引发异常, 并关闭Result (无数据时抛出: sqlalchemy.exc.NoResultFound, 多行数据时抛出: sqlalchemy.exc.MultipleResultsFound)
|  | from sqlalchemy import create\_engine, text |
|  | from sqlalchemy.orm import declarative\_base, sessionmaker |
|  |  |
|  | # 导入公共基类 |
|  | Base = declarative\_base() |
|  | # 数据库配置 |
|  | DATABASE\_CONFIG = { |
|  | "username": "root", |
|  | "password": "123456", |
|  | "host": "localhost", |
|  | "database": "test" |
|  | } |
|  | # 连接mysql |
|  | engine = create\_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE\_CONFIG), |
|  |  echo=True, future=True) |
|  | Session = sessionmaker(bind=engine) |
|  |  |
|  | with Session() as session: |
|  |  session\_res = session.execute( |
|  |  text("select tid, name from teacher where tid=1;") |
|  |  ) |
|  |  |
|  | # (1, '语文老师') |
|  |  print(session\_res.one()) |
|  |  |
  • one_or_none()
    最多返回一行数据或引发异常, 并关闭Result (无数据时返回None, 多行数据时抛出: sqlalchemy.exc.MultipleResultsFound)
|  | from sqlalchemy import create\_engine, text |
|  | from sqlalchemy.orm import declarative\_base, sessionmaker |
|  |  |
|  | # 导入公共基类 |
|  | Base = declarative\_base() |
|  | # 数据库配置 |
|  | DATABASE\_CONFIG = { |
|  | "username": "root", |
|  | "password": "123456", |
|  | "host": "localhost", |
|  | "database": "test" |
|  | } |
|  | # 连接mysql |
|  | engine = create\_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE\_CONFIG), |
|  |  echo=True, future=True) |
|  | Session = sessionmaker(bind=engine) |
|  |  |
|  | with Session() as session: |
|  |  session\_res1 = session.execute( |
|  |  text("select tid, name from teacher where tid < 1;") |
|  |  ) |
|  | # None |
|  |  print(session\_res1.one\_or\_none()) |
|  |  |
|  |  session\_res2 = session.execute( |
|  |  text("select tid, name from teacher where tid = 1;") |
|  |  ) |
|  | # (1, '语文老师') |
|  |  print(session\_res2.one\_or\_none()) |
|  |  |
  • columns(*col_expressions)
    限制返回列, 也可以对列进行重新排序
    即假如结果的列为(a, b, c, d), 但我只需要a和b, 那么只需要result.columns("a", "b"), 你还可以调整它们的顺序, 以方便解包
    注意: 这会修改Result的列, 且该方法的返回值就是修改后的Result对象
|  | from sqlalchemy import create\_engine, text |
|  | from sqlalchemy.orm import declarative\_base, sessionmaker |
|  |  |
|  | # 导入公共基类 |
|  | Base = declarative\_base() |
|  | # 数据库配置 |
|  | DATABASE\_CONFIG = { |
|  | "username": "root", |
|  | "password": "123456", |
|  | "host": "localhost", |
|  | "database": "test" |
|  | } |
|  | # 连接mysql |
|  | engine = create\_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE\_CONFIG), |
|  |  echo=True, future=True) |
|  | Session = sessionmaker(bind=engine) |
|  |  |
|  | with Session() as session: |
|  |  session\_res1 = session.execute( |
|  |  text("select tid, name from teacher;") |
|  |  ) |
|  |  session\_res2 = session.execute( |
|  |  text("select tid, name from teacher;") |
|  |  ) |
|  |  |
|  | for row in session\_res1.columns("tid"): |
|  | # 返回值时修改后的Result |
|  |  |
|  | # 获取字段名元组 |
|  |  print(row.\_fields) # ('tid',) |
|  |  |
|  | # 会修改原Result |
|  |  session\_res2.columns("name") |
|  | for row in session\_res2: |
|  | # 获取字段名元组 |
|  |  print(row.\_fields) # ('name',) |
|  |  |
  • scalar()
    获取第一行的第一列数据, 并关闭Result. 如果没有要获取的行, 则返回None
    如: [(1, "lczmx"), (2, "jack")], 返回 1
|  | from sqlalchemy import create\_engine, text |
|  | from sqlalchemy.orm import declarative\_base, sessionmaker |
|  |  |
|  | # 导入公共基类 |
|  | Base = declarative\_base() |
|  | # 数据库配置 |
|  | DATABASE\_CONFIG = { |
|  | "username": "root", |
|  | "password": "123456", |
|  | "host": "localhost", |
|  | "database": "test" |
|  | } |
|  | # 连接mysql |
|  | engine = create\_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE\_CONFIG), |
|  |  echo=True, future=True) |
|  | Session = sessionmaker(bind=engine) |
|  |  |
|  | with Session() as session: |
|  |  session\_res = session.execute( |
|  |  text("select tid, name from teacher;") |
|  |  ) |
|  |  print(session\_res.scalar()) # 1 |
  • scalar_one()
    只返回一行数据的第一列或引发异常, 并关闭Result (无数据时抛出: sqlalchemy.exc.NoResultFound, 多行数据时抛出: sqlalchemy.exc.MultipleResultsFound)
    Result.one() + Result.scalar()
|  | from sqlalchemy import create\_engine, text |
|  | from sqlalchemy.orm import declarative\_base, sessionmaker |
|  |  |
|  | # 导入公共基类 |
|  | Base = declarative\_base() |
|  | # 数据库配置 |
|  | DATABASE\_CONFIG = { |
|  | "username": "root", |
|  | "password": "123456", |
|  | "host": "localhost", |
|  | "database": "test" |
|  | } |
|  | # 连接mysql |
|  | engine = create\_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE\_CONFIG), |
|  |  echo=True, future=True) |
|  | Session = sessionmaker(bind=engine) |
|  |  |
|  | with Session() as session: |
|  |  session\_res = session.execute( |
|  |  text("select tid, name from teacher where tid=1;") |
|  |  ) |
|  |  |
|  |  print(session\_res.scalar\_one()) # 1 |
|  |  |
  • scalar_one_or_none()
    最多返回一行数据的第一列或引发异常, 并关闭Result (无数据时返回None, 多行数据时抛出: sqlalchemy.exc.MultipleResultsFound)
    Result.one_or_none() + Result.scalar()
|  | from sqlalchemy import create\_engine, text |
|  | from sqlalchemy.orm import declarative\_base, sessionmaker |
|  |  |
|  | # 导入公共基类 |
|  | Base = declarative\_base() |
|  | # 数据库配置 |
|  | DATABASE\_CONFIG = { |
|  | "username": "root", |
|  | "password": "123456", |
|  | "host": "localhost", |
|  | "database": "test" |
|  | } |
|  | # 连接mysql |
|  | engine = create\_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE\_CONFIG), |
|  |  echo=True, future=True) |
|  | Session = sessionmaker(bind=engine) |
|  |  |
|  | with Session() as session: |
|  |  session\_res1 = session.execute( |
|  |  text("select tid, name from teacher where tid<1;") |
|  |  ) |
|  |  |
|  |  print(session\_res1.scalar\_one\_or\_none()) # None |
|  |  |
|  |  session\_res2 = session.execute( |
|  |  text("select tid, name from teacher where tid=1;") |
|  |  ) |
|  |  |
|  |  print(session\_res2.scalar\_one\_or\_none()) # 1 |
|  |  |
  • scalars(index=0)
    返回一个ScalarResult对象, 该对象以每行数据的 第index列 元素作为数据 (而不是ResultRow)
    该对象的方法有: unique partitions fetchall fetchmany all first one_or_none one
    具体使用与Result类似
|  | from sqlalchemy import create\_engine, text |
|  | from sqlalchemy.orm import declarative\_base, sessionmaker |
|  |  |
|  | # 导入公共基类 |
|  | Base = declarative\_base() |
|  | # 数据库配置 |
|  | DATABASE\_CONFIG = { |
|  | "username": "root", |
|  | "password": "123456", |
|  | "host": "localhost", |
|  | "database": "test" |
|  | } |
|  | # 连接mysql |
|  | engine = create\_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE\_CONFIG), |
|  |  echo=True, future=True) |
|  | Session = sessionmaker(bind=engine) |
|  |  |
|  | with Session() as session: |
|  |  session\_res = session.execute( |
|  |  text("select tid, name from teacher;") |
|  |  ) |
|  |  |
|  | # [1, 2, 3] |
|  |  print(session\_res.scalars().all()) |
|  |  |
  • mappings()
    返回一个MappingResult对象, MappingResult对象与Result对象类似, 但是一行数据使用RowMapping对象表示, RowMapping对象类似于字典对象, 简而言之: 调用该方法, 你可以将一行数据由类元组变为类字典
|  | from sqlalchemy import create\_engine, text |
|  | from sqlalchemy.orm import declarative\_base, sessionmaker |
|  |  |
|  | # 导入公共基类 |
|  | Base = declarative\_base() |
|  | # 数据库配置 |
|  | DATABASE\_CONFIG = { |
|  | "username": "root", |
|  | "password": "123456", |
|  | "host": "localhost", |
|  | "database": "test" |
|  | } |
|  | # 连接mysql |
|  | engine = create\_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE\_CONFIG), |
|  |  echo=True, future=True) |
|  | Session = sessionmaker(bind=engine) |
|  |  |
|  | with Session() as session: |
|  |  session\_res = session.execute( |
|  |  text("select tid, name from teacher;") |
|  |  ) |
|  |  |
|  | for d in session\_res.mappings(): |
|  | # 像操作字典一样操作 d 即可 |
|  |  print(d.get("tid"), d.get("name")) |
|  |  |
  • keys()
    从SQLAlchemy1.4起 (之前的版本返回一个列表), 该方法将返回一个RMKeyView对象, 该对象可迭代, 其_keys属性存放列的名称, 由于实现的__contains__方法, 因此也可以使用in运算符作判断.
|  | from sqlalchemy import create\_engine, text |
|  | from sqlalchemy.orm import declarative\_base, sessionmaker |
|  | from collections import Iterable |
|  |  |
|  | # 导入公共基类 |
|  | Base = declarative\_base() |
|  | # 数据库配置 |
|  | DATABASE\_CONFIG = { |
|  | "username": "root", |
|  | "password": "123456", |
|  | "host": "localhost", |
|  | "database": "test" |
|  | } |
|  | # 连接mysql |
|  | engine = create\_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE\_CONFIG), |
|  |  echo=True, future=True) |
|  | Session = sessionmaker(bind=engine) |
|  |  |
|  | with Session() as session: |
|  |  session\_res = session.execute( |
|  |  text("select tid, name from teacher;") |
|  |  ) |
|  |  |
|  |  keys = session\_res.keys() |
|  |  print(keys) # RMKeyView(['tid', 'name']) |
|  |  |
|  | #  |
|  |  print(type(keys)) |
|  |  |
|  | # 可迭代的 |
|  |  print(isinstance(keys, Iterable)) # True |
|  |  |
|  | if "name" in keys: |
|  |  print("name in keys") |
|  |  |
  • freeze()
    可以对Result进行缓存, 见官方文档: Re-Executing Statements
  • merge(*others)
    该方法合并其他Result, 返回一个MergedResult对象, 你可以像一般的Result一样操作它, 但是取值的时候注意游标的位置(MergedResult关闭, Result也关闭; MergedResult取完了值, Result的值也被取完)
|  | from sqlalchemy import create\_engine, text |
|  | from sqlalchemy.orm import declarative\_base, sessionmaker |
|  |  |
|  | # 导入公共基类 |
|  | Base = declarative\_base() |
|  | # 数据库配置 |
|  | DATABASE\_CONFIG = { |
|  | "username": "root", |
|  | "password": "123456", |
|  | "host": "localhost", |
|  | "database": "test" |
|  | } |
|  | # 连接mysql |
|  | engine = create\_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE\_CONFIG), |
|  |  echo=True, future=True) |
|  | Session = sessionmaker(bind=engine) |
|  |  |
|  | with Session() as session: |
|  |  session\_res1 = session.execute( |
|  |  text("select tid, name from teacher where tid=1;") |
|  |  ) |
|  |  session\_res2 = session.execute( |
|  |  text("select tid, name from teacher where tid=2;") |
|  |  ) |
|  |  |
|  |  session\_res = session\_res2.merge(session\_res1) |
|  |  |
|  | # 注意 先session\_res2再session\_res1 |
|  |  |
|  | # (2, '英语老师') |
|  |  print(session\_res.fetchone()) |
|  |  |
|  | # [(1, '语文老师')] |
|  |  print(session\_res1.all()) |
|  |  |
|  | # session\_res已经取过一次 |
|  | # 所以返回: [] |
|  |  print(session\_res2.all()) |
|  |  |
  • partitions(size=None)
    迭代生成size大小的行的子列表, sizeNone时调用Result.yield_per(), 否则调用Result.fetchmany(size)
|  | from sqlalchemy import create\_engine, text |
|  | from sqlalchemy.orm import declarative\_base, sessionmaker |
|  |  |
|  | # 导入公共基类 |
|  | Base = declarative\_base() |
|  | # 数据库配置 |
|  | DATABASE\_CONFIG = { |
|  | "username": "root", |
|  | "password": "123456", |
|  | "host": "localhost", |
|  | "database": "test" |
|  | } |
|  | # 连接mysql |
|  | engine = create\_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE\_CONFIG), |
|  |  echo=True, future=True) |
|  | Session = sessionmaker(bind=engine) |
|  |  |
|  | with Session() as session: |
|  |  session\_res1 = session.execute( |
|  |  text("select tid, name from teacher;") |
|  |  ) |
|  | # 每次迭代 取 1行 数据 |
|  | for i in session\_res1.partitions(): |
|  |  print(i) |
|  | """ |
|  |  [(1, '语文老师')] |
|  |  [(2, '英语老师')] |
|  |  [(3, '数学老师')] |
|  |  """ |
|  |  |
|  |  session\_res2 = session.execute( |
|  |  text("select tid, name from teacher;") |
|  |  ) |
|  | # 每次迭代 取 2行 数据 |
|  | for i in session\_res2.partitions(2): |
|  |  print(i) |
|  | """ |
|  |  [(1, '语文老师'), (2, '英语老师')] |
|  |  [(3, '数学老师')] |
|  |  """ |
|  | # 已经迭代完了, 就没有值了 |
|  |  print(list(session\_res2.partitions())) # [] |
|  |  |
  • yield_per(num)
    迭代num行数据, 返回的是Result对象
|  | from sqlalchemy import create\_engine, text |
|  | from sqlalchemy.orm import declarative\_base, sessionmaker |
|  |  |
|  | # 导入公共基类 |
|  | Base = declarative\_base() |
|  | # 数据库配置 |
|  | DATABASE\_CONFIG = { |
|  | "username": "root", |
|  | "password": "123456", |
|  | "host": "localhost", |
|  | "database": "test" |
|  | } |
|  | # 连接mysql |
|  | engine = create\_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE\_CONFIG), |
|  |  echo=True, future=True) |
|  | Session = sessionmaker(bind=engine) |
|  |  |
|  | with Session() as session: |
|  |  session\_res = session.execute( |
|  |  text("select tid, name from teacher;") |
|  |  ) |
|  | # [(1, '语文老师'), (2, '英语老师'), (3, '数学老师')] |
|  |  print(session\_res.yield\_per(1).all()) |
|  |  |
  • close
    关闭此Result, 再操作的话会抛出异常: sqlalchemy.exc.ResourceClosedError: This result object is closed.

Row

一般来说, 一行数据是一个Row对象
常用的属性或方法:

属性/方法 属性或方法 描述
_asdict 方法 返回字段名与值的字典数据 并添加到 _mapping属性
_fields 属性 返回字符名的元组
_mapping 属性 返回字段名与值的字典数据

一般使用:
注意: 我们每一次迭代Result对象, 得到的是Row对象

通过属性取值

|  | result = conn.execute(text("select x, y from some\_table")) |
|  |  |
|  | for row in result: |
|  |  y = row.y |
|  |  |
|  | # illustrate use with Python f-strings |
|  |  print(f"Row: {row.x} {row.y}") |
|  |  |

在ORM中, select(Article)的属性是Article, select(Article.title)的属性是title

通过元组解包

|  | result = conn.execute(text("select x, y from some\_table")) |
|  |  |
|  | for x, y in result: |
|  | # ... |
|  |  |

通过索引取值

|  | result = conn.execute(text("select x, y from some\_table")) |
|  |  |
|  | for row in result: |
|  |  x = row[0] |
|  |  |

MetaData

MetaData是包含TableEngine的对象, 也就是说它主要是用来管理Table (表)的
下面列出MetaData对象的一些方法

方法 参数 描述
clear 清除此元数据中的所有表对象
create_all(bind=None, tables=None, checkfirst=True) bind: 数据库Engine tables: Table对象列表 checkfirst: 是否 仅不存在表时 创建 在数据库中创建 元数据中的所有表
drop_all(bind=None, tables=None, checkfirst=True) bind: 数据库Engine tables: Table对象列表 checkfirst: 是否 仅存在表时 删除 在数据库中删除 元数据中存储的所有表
remove(table) table: 表对象 从此元数据中删除给定的表对象
tables tables是属性, 无参数 返回Table的字段对象

内置函数

常用的SQL函数:

函数名 对应的SQL函数 描述
max MAX 返回一组值中的最大值
min MIN 返回一组值中的最小值
count COUNT 返回匹配指定条件的行数, 没有参数时为: COUNT(*)
sum SUM 计算一组值的总和
rank RAND 产生 0 至 1 之间的随机数
concat CONCAT 多个字符串连接为一个字符串
char_length CHAR_LENGTH 计算字符串字符数
coalesce COALESCE 接受一系列的表达式或列, 返回第一个非空的值
session_user SESSION_USER 返回当前连接的当前用户名和主机名, 形如: root@localhost
user USER 返回连接的当前用户名和主机名, 形如: root@localhost
current_user CURRENT_USER 返回用户名和主机名, 形如: root@localhost
current_date CURRENT_DATE 函数返回当前日期, 格式: YYYY-MM-DD
current_time CURRENT_TIME 返回当前时间, 格式: HH-MM-SS
current_timestamp CURRENT_TIMESTAMP 返回当前日期和时间, 格式: YYYY-MM-DD HH-MM-SS
localtime LOCALTIME 返回当前日期和时间, 格式: YYYY-MM-DD HH-MM-SS
localtimestamp LOCALTIMESTAMP 返回当前日期和时间, 格式: YYYY-MM-DD HH-MM-SS
now NOW 返回当前日期和时间, 格式: YYYY-MM-DD HH-MM-SS
sysdate SYSDATE 返回当前日期和时间, 格式: YYYY-MM-DD HH:MM:SS
array_agg ARRAY_AGG PostgreSql可用, 把表达式变成一个数组
dense_rank DENSE_RANK 用于排名, 见: MySQL DENSE_RANK() 函数
percent_rank PERCENT_RANK 计算分区或结果集中行的百分位数排名
Function 描述一个SQL函数, 见: Function
GenericFunction 见: GenericFunction
grouping_sets GROUPING SETS 定义分组集, 见: SQL Grouping Sets运算符
rollup ROLLUP 生成小计和总计, 见: MySQL ROLLUP
cube 见: cube
cume_dist 见: cume_dist

全部函数见: SQL and Generic Functions

使用例子:

|  | from sqlalchemy import create\_engine |
|  | from sqlalchemy.orm import declarative\_base, sessionmaker |
|  | from sqlalchemy import Column, Integer, String |
|  | from sqlalchemy import select, func |
|  |  |
|  | # 导入公共基类 |
|  | Base = declarative\_base() |
|  | # 数据库配置 |
|  | DATABASE\_CONFIG = { |
|  | "username": "root", |
|  | "password": "123456", |
|  | "host": "localhost", |
|  | "database": "test" |
|  | } |
|  | # 连接mysql |
|  | engine = create\_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE\_CONFIG), |
|  |  echo=True, future=True) |
|  | Session = sessionmaker(bind=engine) |
|  |  |
|  |  |
|  | class Teacher(Base): |
|  |  \_\_tablename\_\_ = "teacher" |
|  |  |
|  |  tid = Column("tid", Integer, primary\_key=True, autoincrement=True) |
|  |  |
|  |  name = Column("name", String(10), nullable=False, comment="教师名") |
|  |  |
|  |  |
|  | with Session() as session: |
|  |  session\_res = session.execute( |
|  |  select(func.count()).select\_from(Teacher) |
|  |  ) |
|  | # (3,) |
|  |  print(session\_res.one()) |
|  |  |

Column定义

一个Column即表的一列数据, 和我们用SQL语句定义一列数据一样, 参数主要包括: 字段名、字段类型、约束条件, 比如:

|  | from sqlalchemy import Column, String |
|  |  |
|  | Column("name", String(30), unique=True, nullable=False, comment='姓名') |
|  |  |

字段类型, 一般你可以用直接指定数据库的字段类型, 也可以让SQLAlchemy的DDL自动选择字段类型
直接使用数据库的字段类型

字段类型 描述
ARRAY(item_type, ...) 数组类型, 目前只支持PostgreSQL, 因此建议用: sqlalchemy.dialects.postgresql.ARRAY
BIGINT SQL BIGINT类型
BINARY(length) SQL BINARY类型
BLOB SQL BLOB类型
BOOLEAN SQL布尔类型
CHAR SQLCHAR类型
CLOB SQL CLOB型
DATE SQL DATE期类型
DATETIME SQL DATETIME类型
DECIMAL SQL DECIMAL类型
FLOAT SQL FLOAT类型
INT sqlalchemy.sql.sqltypes.INTEGER的别名
INTEGER SQL INT或INTEGER类型
JSON SQL JSON类型
NCHAR SQL NChar类型
NUMERIC SQL NUMERIC类型
NVARCHAR SQL NVARCHAR类型
REAL SQL REAL类型
SMALLINT SQL SMALLINT类型
TEXT SQL TEXT类型
TIME SQL TIME类型
TIMESTAMP SQL TIMESTAMP类型
VARBINARY SQLVARBINARY类型
VARCHAR SQL VARCHAR类型

关于SQL的数据类型, 见: SQL 数据类型

自动转化的字段类型

字段类型 描述 通常对应的字段类型
Boolean 布尔数据类型 Boolean或SMALLINT
String 所有字符串和字符类型的基 VARCHAR
Text 大小可变的字符串类型 CLOB或TEXT
LargeBinary 大的二进制字节数据类型 BLOB或BYTEA
Unicode 长度可变的Unicode字符串类型
UnicodeText 无限长的Unicode字符串类型
SmallInteger 较小的一种 int 整数 SMALLINT
Integer int类型
BigInteger BIGINT数据类型 BIGINT
Numeric 用于固定精度数字的类型 NUMERIC或DECIMAL
Float 浮点类型 FLOAT 或 REAL .
Date datetime.date类型
Time datetime.time类型
DateTime datetime.datetime类型
Interval datetime.timedelta类型
Enum 枚举类型 ENUM或VARCHAR
PickleType 保存使用pickle序列化的python对象 二进制类型
SchemaType 将类型标记为可能需要架构级DDL才能使用
MatchType 引用match运算符的返回类型 MySQL的是浮点型

约束条件

约束条件和其他参数 描述
autoincrement 是否自增
default 设置默认参数, 可以是可调用对象
index 是否创建索引
info SchemaItem.info的属性
nullable 是否非空, Falsenot null
primary_key 是否为主键
unique 是否唯一
comment 注释字段, 会写入SQL 中的COMMENT
onupdate 调用更新数据时传入的值, 如: updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now())
server_default 为SQLAlchemy的DDL设置默认值, 可以是str unicode text(), 如: sqlalchemy.func.now()

注 : sqlalchemy.func, 用于生成SQL函数表达式, 详情见: 内置函数

Column的例子:

|  | from sqlalchemy import DateTime, func |
|  |  |
|  |  |
|  | class Data(Base): |
|  |  \_\_tablename\_\_ = 'data' |
|  | id = Column(Integer, primary\_key=True, index=True, autoincrement=True) |
|  |  created\_at = Column(DateTime, server\_default=func.now(), comment='创建时间') |
|  |  updated\_at = Column(DateTime, server\_default=func.now(), onupdate=func.now(), comment='更新时间') |
|  |  |

SQLAlchemy完全入门相关推荐

  1. SQLAlchemy简单入门

    SQlAlchemy简单使用 sqlalchemy介绍 SQLAlchemy的是Python的SQL工具包和对象关系映射器,让应用程序开发人员可以使用上SQL的强大功能和灵活性. 它提供了一套完整的企 ...

  2. SQLAlchemy文档翻译

    想记录一下SQLAlchemy的入门学习,然后突发奇想觉得:为什么不直接翻译一下文档呢?于是顺手查了查怎么使用Gitbook,2333 于是就在Github开了项目,然后导入了Gitbook,开始写. ...

  3. Python SQLAlchemy入门教程

    原文:https://www.cnblogs.com/ybjourney/p/11832045.html Python SQLAlchemy入门教程 一.介绍 关于ORM 为什么用sqlalchemy ...

  4. sqlalchemy mysql教程_SQLAlchemy 教程 —— 基础入门篇

    SQLAlchemy 教程 -- 基础入门篇 一.课程简介 1.1 实验内容 本课程带领大家使用 SQLAlchemy 连接 MySQL 数据库,创建一个博客应用所需要的数据表,并介绍了使用 SQLA ...

  5. 【SQLAlchemy】第一篇——入门

    1.背景 许多年以前,在我刚接触数据科学和数据库的时候,经常需要从MySQL中获取数据进行计算.一开始采用的方法是使用pymysql执行SQL语句,然后将返回的结果处理成pandas的DataFram ...

  6. SQLAlchemy简介与入门

    ORM与SQLAlchemy简介 ORM ORM:Object Relation Mapping,最初主要描述的是程序中的Object对象和关系型数据库中Rlation关系(表)之间的映射关系,目前来 ...

  7. SQLAlchemy 教程 —— 基础入门篇

    参照博客:https://blog.csdn.net/qq_43355223/article/details/83024430 filter operator : 检索返回的列表,以及列表的标量 : ...

  8. orm设置bool型 python_Python SQLAlchemy入门教程

    本文将以Mysql举例,介绍sqlalchemy的基本用法.其中,Python版本为2.7,sqlalchemy版本为1.1.6. 一. 介绍 SQLAlchemy是Python中最有名的ORM工具. ...

  9. eric python mysql,python入门教程13-07 (python语法入门之ORM框架SQLAlchemy)

    本章节主要介绍了python的orm框架的介绍以及作用,ORM框架的类型及运用方法,下面我们一起来看看吧! 一.面向对象应用场景: 1.函数有共同参数,解决参数不断重用: 2.模板(约束同一类事物的, ...

最新文章

  1. 通过xmanager远程连接redhat linux as 5
  2. Linux的文件系统
  3. 基于“分布 —— 多分布” 的点云配准方法
  4. Android的px、dp和sp
  5. IE兼容问题 动态生成的节点IE浏览器无法触发
  6. 一发工资就全部取出,会对银行流水有影响吗?
  7. java跳转_java servlet 几种页面跳转的方法
  8. 推荐一款强大的SQL Internal 查看工具InternalsViewer
  9. 2018 OpenInfra Days China 大咖来袭——开源,我们是认真的
  10. 博科光交机SNMP配置
  11. 用 1 行 Python 代码实现 FTP 服务器 - Pyftpdlib
  12. jeDate日期控件
  13. 浅谈周大福的积分运营
  14. No code “EPSG:4326“ from authority “EPSG“
  15. 让你嘿嘿嘿!最新windows7升级win10方法!
  16. 卷积神经网络(CNN)到底是个什么鬼东西(结构及作用)
  17. Why Transformer works
  18. Android实现网络视频播放
  19. python包离线安装教程_Linux下离线安装Python项目的依赖包
  20. ios 自带录屏框架replayKit的使用

热门文章

  1. 关于360和QQ,我说几句
  2. 搞不懂c++ 的函数声明与定义
  3. android nullable jar,java – 导入NotNull或Nullable,Android Studio将无法编译
  4. python实现简单对话机器人_采用python实现简单QQ单用户机器人的方法
  5. VK1650是LED数显驱动IC/数码管显示驱动控制电路(IC),8*4共阴/4*8共阳驱动,SOP16封装,可兼容替代市面上1650,FAE技术支持
  6. 墙王谓古哥曰 Recent GoogleGate in Classical Chinese
  7. java日期相差周_利用Java中Calendar计算两个日期之间的天数和周数
  8. 程序员,请对自己好一点!
  9. Flex中如何创建放射线状填充列图(ColumnChart)图表的例子
  10. docker运行yyets_docker 后台运行和进入后台运行的容器