基本使用
SQLAlchemy 的session
是用于管理数据库操作的一个像容器一样的东西. 模型实例对象本身独立存在, 而要让其修改(创建)生效, 则需要把它们加入某个session
. 同时你也可以把模型实例对象从session
中去除. 被session
管理的实例对象, 在session.commit()
时被提交到数据库. 同时session.rollback()
是回滚变更.
session.flush()
的作用是在事务管理内与数据库发生交互, 对应的实例状态被反映到数据库. 比如自增ID
被填充上值.
1 | user = User(name=u'名字') |
for update
SQLAlchemy 的 Query 支持 select ... for update / share
.
1 | session.Query(User).with_for_update().first() |
完整形式是:
1 | with_for_update(read=False, nowait=False, of=None) |
read
是标识加互斥锁还是共享锁. 当为True
时, 即for share
的语句, 是共享锁. 多个事务可以获取共享锁, 互斥锁只能一个事务获取. 有”多个地方”都希望是”这段时间我获取的数据不能被修改, 我也不会改”, 那么只能使用共享锁.
nowait
其它事务碰到锁, 是否不等待直接”报错”.
of
指明上锁的表, 如果不指明, 则查询中涉及的所有表(行)都会加锁.
事务嵌套
SQLAlchemy 中的事务嵌套有两种情况. 一是在session
中管理的事务, 本身有层次性. 二是session
和原始的connection
之间, 是一种层次关系, 在这session
,connection
两个概念之中的事务同样具有这样的层次.session
中的事务, 可能通过begin_nested()
方法做savepoint
:
1 | session.add(u1) |
或者使用上下文对象:
1 | for record in records: |
嵌套的事务的一个效果, 是最外层事务提交整个变更才会生效.
1 | user = User(name='2') |
于是, 前面说的第二种情况有一种应用方式, 就是在connection
上做一个事务, 最终也在connection
上回滚这个事务, 如果session
是bind
到这个连接上的, 那么 session
上所做的更改全部不会生效:
1 | conn = Engine.connect() |
在测试中这种方式可能会有用.
二段式提交
二段式提交, Two-Phase
, 是为解决分布式环境下多点事务控制的一套协议.
与一般事务控制的不同是, 一般事务是 begin
, 之后 commit
结束.
而二段式提交的流程上, begin
之后, 是 prepare transaction
transaction_id
, 这时相关事务数据已经持久化了. 之后, 再在任何时候(哪怕重启服务), 作 commit prepared
transaction_id
或者 rollback prepared
transaction_id
.
从多点事务的控制来看, 应用层要做的事是, 先把任务分发出去, 然后收集”事务准备”的状态(prepare transaction 的结果). 根据收集的结果决定最后是commit
还是rollback
.
简单来说, 就是事务先保存, 再说提交的事.
SQLAlchemy 中对这个机制的支持, 是在构建会话类是加入twophase
参数:
1 | Session = sessionmaker(twophase=True) |
然后会话类可以根据一些策略, 绑定多个Engine
, 可以是多个数据库连接, 比如:
1 | Session = sessionmaker(twophase=True) |
这样, 在获取一个会话实例之后, 就处在二段式提交机制的支持之下, SQLAlchemy 自己会作多点的协调了. 完整的流程:
1 | Engine = create_engine('postgresql://test@localhost:5432/test', echo=True) |
对应的SQL
大概就是:
1 | begin; |
使用时, Postgresql
数据库需要把 max_prepared_transactions
这个配置项的值改成大于 0
Example
如何正确使用事务?
假设有一个简单的银行系统,一共两名用户:
1 | class User(BaseModel): |
然后开两个 session,同时进行两次转账操作:
1 | session1 = DB_Session() |
现在看看结果:
1 |
|
两次转账都成功了,但是只转走了一笔钱,这明显不科学。
可见 MySQL InnoDB 虽然支持事务,但并不是那么简单的,还需要手动加锁。
首先来试试读锁:
1 | user1 = session1.query(User).with_lockmode('read').get(1) |
现在在执行 session1.commit() 的时候,因为 user1 和 user2 都被 session2 加了读锁,所以会等待锁被释放。超时以后,session1.commit() 会抛出个超时的异常,如果捕捉了的话,或者 session2 在另一个进程,那么 session2.commit() 还是能正常提交的。这种情况下,有一个事务是肯定会提交失败的,所以那些更改等于白做了。
接下来看看写锁,把上段代码中的 ‘read’ 改成 ‘update’ 即可。这次在执行 select 的时候就会被阻塞了:
1 | user1 = session2.query(User).with_lockmode('update').get(1) |
这样只要在超时期间内,session1 完成了提交或回滚,那么 session2 就能正常判断 user1.money >= 100 是否成立了。
由此可见,如果需要更改数据,最好加写锁。
那么什么时候用读锁呢?如果要保证事务运行期间内,被读取的数据不被修改,自己也不去修改,加读锁即可。
举例来说,假设我查询一个用户的开支记录(同时包含余额和转账记录),可以直接把 user 和 tansefer_log 做个内连接。
但如果用户的转账记录特别多,我在查询前想先验证用户的密码(假设在 user 表中),确认相符后才查询转账记录。而这两次查询的期间内,用户可能收到了一笔转账,导致他的 money 字段被修改了,但我在展示给用户时,用户的余额仍然没变,这就不正常了。
而如果我在读取 user 时加了读锁,用户是无法收到转账的(因为无法被另一个事务加写锁来修改 money 字段),这就保证了不会查出额外的 tansefer_log 记录。等我查询完两张表,释放了读锁后,转账就可以继续进行了,不过我显示的数据在当时的确是正确和一致的。
另外要注意的是,如果被查询的字段没有加索引的话,就会变成锁整张表了:
1 | session1.query(User).filter(User.id > 50).with_lockmode('update').all() |
要避免的话,可以这样:
1 | money = Column(DECIMAL(10, 2), index=True) |
另一个注意点是子事务。
InnoDB 支持子事务(savepoint 语句),可以简化一些逻辑。
例如有的方法是用于改写数据库的,它执行时可能提交了事务,但在后续的流程中却执行失败了,却没法回滚那个方法中已经提交的事务。这时就可以把那个方法当成子事务来运行了:
1 | def step1(): |
此外,rollback 一个子事务,可以释放这个子事务中获得的锁,提高并发性和降低死锁概率。
如何对一个字段进行自增操作?
最简单的办法就是获取时加上写锁:
1 | user = session.query(User).with_lockmode('update').get(1) |
如果不想多一次读的话,这样写也是可以的:
1 | session.query(User).filter(User.id == 1).update({ |