事务和连接管理
管理交易
在 1.4 版更改: 对会话事务管理进行了修改,使其更清晰、更易于使用。特别是,它现在具有“autobegin”操作的特点,这意味着事务开始的点可以被控制,而不必使用传统的“autocommit”模式。
这个 Session
一次跟踪单个“虚拟”事务的状态,使用一个名为 SessionTransaction
。然后,该对象利用基础的 Engine
或引擎,而这些引擎 Session
对象进行绑定,以便使用 Connection
对象。
或者在需要时使用“虚拟事务”自动启动 Session.begin()
方法。在很大程度上,Python上下文管理器的使用在创建级别上都得到了支持 Session
对象以及维护 SessionTransaction
.
下面,假设我们从 Session
::
from sqlalchemy.orm import Session session = Session(engine)
我们现在可以使用上下文管理器在已划分的事务中运行操作:
with session.begin(): session.add(some_object()) session.add(some_other_object()) # commits transaction at the end, or rolls back if there # was an exception raised
在上述上下文的末尾,假设没有引发异常,任何挂起的对象都将刷新到数据库中,并提交数据库事务。如果在上述块中引发异常,则事务将回滚。在这两种情况下 Session
退出块之后,可以在后续事务中使用。
这个 Session.begin()
方法是可选的,并且 Session
也可以用在“按需提交”方法中,它将根据需要自动开始事务;这些事务只需要提交或回滚:
session = Session(engine) session.add(some_object()) session.add(some_other_object()) session.commit() # commits # will automatically begin again result = session.execute(< some select statement >) session.add_all([more_objects, ...]) session.commit() # commits session.add(still_another_object) session.flush() # flush still_another_object session.rollback() # rolls back still_another_object
这个 Session
其特点是 Session.close()
方法。如果 Session
如果在尚未提交或回滚的事务中开始,则此方法将取消(即回滚)该事务,并且还将删除包含在 Session
对象的状态。如果 Session
正在以这样一种方式使用 Session.commit()
或 Session.rollback()
不保证(例如,不在上下文管理器或类似工具中),则 close
方法可用于确保释放所有资源:
# expunges all objects, releases all transactions unconditionally # (with rollback), releases all database connections back to their # engines session.close()
最后,会话构造/关闭过程本身可以通过上下文管理器运行。这是确保 Session
对象的使用范围在固定块内。通过 Session
先建设者:
with Session(engine) as session: session.add(some_object()) session.add(some_other_object()) session.commit() # commits session.add(still_another_object) session.flush() # flush still_another_object session.commit() # commits result = session.execute(<some SELECT statement>) # remaining transactional state from the .execute() call is # discarded
同样, sessionmaker
可以用同样的方式使用:
Session = sesssionmaker(engine) with Session() as session: with session.begin(): session.add(some_object) # commits # closes the Session
sessionmaker
包含自身 sessionmaker.begin()
方法以允许两个操作同时进行::
with Session.begin() as session: session.add(some_object):
使用保存点
如果基础引擎支持保存点事务,则可以使用 Session.begin_nested()
方法:
Session = sessionmaker() with Session.begin() as session: session.add(u1) session.add(u2) nested = session.begin_nested() # establish a savepoint session.add(u3) nested.rollback() # rolls back u3, keeps u1 and u2 # commits u1 and u2
每次 Session.begin_nested()
调用时,一个新的“BEGIN SAVEPOINT”命令将使用唯一标识符发送到数据库。什么时候 SessionTransaction.commit()
调用时,将在数据库上发出“Release SavePoint”,并且如果 SessionTransaction.rollback()
调用时,将发出“回滚到保存点”。
Session.begin_nested()
也可以用作上下文管理器,方式与 Session.begin()
方法:
for record in records: try: with session.begin_nested(): session.merge(record) except: print("Skipped record %s" % record) session.commit()
什么时候? Session.begin_nested()
被称为 Session.flush()
无条件发布(无论 autoflush
设置)。这样,当对该嵌套事务执行回滚时,会话的完整状态将过期,从而导致所有后续属性/实例访问引用 Session
就在之前 Session.begin_nested()
被叫来。
参见
NestedTransaction
-The NestedTransaction
类是由 Session
在内部生成保存点块。
会话级与引擎级事务控制
从SQLAlchemy 1.4开始 sessionmaker
核心 Engine
两个对象都支持 2.0 style 通过利用 Session.future
标志以及 create_engine.future
使这两个对象采用2.0样式的语义。
当使用future模式时,两个包之间应该在 sessionmaker
与 Engine
以及 Session
与 Connection
. 以下部分将根据以下方案详细说明这些方案:
ORM (using future Session) Core (using future engine) ----------------------------------------- ----------------------------------- sessionmaker Engine Session Connection sessionmaker.begin() Engine.begin() some_session.commit() some_connection.commit() with some_sessionmaker() as session: with some_engine.connect() as conn: with some_sessionmaker.begin() as session: with some_engine.begin() as conn: with some_session.begin_nested() as sp: with some_connection.begin_nested() as sp:
边做边做
两个 Session
和 Connection
特征 Connection.commit()
和 Connection.rollback()
方法。使用SQLAlchemy 2.0样式的操作,这些方法会影响 最外层 所有情况下的交易。
发动机:
engine = create_engine("postgresql://user:pass@host/dbname", future=True) with engine.connect() as conn: conn.execute( some_table.insert(), [ {"data": "some data one"}, {"data": "some data two"}, {"data": "some data three"} ] ) conn.commit()
会议:
Session = sessionmaker(engine, future=True) with Session() as session: session.add_all([ SomeClass(data="some data one"), SomeClass(data="some data two"), SomeClass(data="some data three") ]) session.commit()
开始一次
两个 sessionmaker
和 Engine
特征a Engine.begin()
方法,该方法将获取一个新对象,用它来执行SQL语句(方法 Session
和 Connection
,然后返回将维护该对象的begin/commit/rollback上下文的上下文管理器。
发动机:
engine = create_engine("postgresql://user:pass@host/dbname", future=True) with engine.begin() as conn: conn.execute( some_table.insert(), [ {"data": "some data one"}, {"data": "some data two"}, {"data": "some data three"} ] ) # commits and closes automatically
会议:
Session = sessionmaker(engine, future=True) with Session.begin() as session: session.add_all([ SomeClass(data="some data one"), SomeClass(data="some data two"), SomeClass(data="some data three") ]) # commits and closes automatically
嵌套事务
当通过 Session.begin_nested()
或 Connection.begin_nested()
方法,则返回的事务对象必须用于提交或回滚保存点。召唤 Session.commit()
或 Connection.commit()
方法将始终提交 最外层 事务;这是SQLAlchemy 2.0特有的行为,与1.x系列相反。
发动机:
engine = create_engine("postgresql://user:pass@host/dbname", future=True) with engine.begin() as conn: savepoint = conn.begin_nested() conn.execute( some_table.insert(), [ {"data": "some data one"}, {"data": "some data two"}, {"data": "some data three"} ] ) savepoint.commit() # or rollback # commits automatically
会议:
Session = sessionmaker(engine, future=True) with Session.begin() as session: savepoint = session.begin_nested() session.add_all([ SomeClass(data="some data one"), SomeClass(data="some data two"), SomeClass(data="some data three") ]) savepoint.commit() # or rollback # commits automatically
显式开始
在 1.4 版更改: SQLAlchemy 1.4不推荐使用“自动提交模式”,它以前是通过使用 Session.autocommit
旗子。未来,一种允许使用 Session.begin()
方法是新的“autobegin”行为,因此当 Session
是第一次构造的,或者在前一个事务结束之后,在它开始一个新的事务之前。
有关从依赖begin()/commit()对嵌套的框架从“subtransaction”模式迁移的背景信息,请参阅下一节 从“子事务”模式迁移 .
这个 Session
特点是“自动启动”行为,这意味着一旦操作开始,它就可以确保 SessionTransaction
以跟踪正在进行的操作。此事务在以下情况下完成 Session.commit()
被称为。
通常需要控制“开始”操作发生的点,尤其是在框架集成中。为了适应这种情况 Session
使用“自动注册”策略,这样 Session.begin()
方法可以直接为 Session
尚未开始交易:
Session = sessionmaker(bind=engine) session = Session() session.begin() try: item1 = session.query(Item).get(1) item2 = session.query(Item).get(2) item1.foo = 'bar' item2.bar = 'foo' session.commit() except: session.rollback() raise
使用上下文管理器更习惯地调用上述模式:
Session = sessionmaker(bind=engine) session = Session() with session.begin(): item1 = session.query(Item).get(1) item2 = session.query(Item).get(2) item1.foo = 'bar' item2.bar = 'foo'
这个 Session.begin()
方法和会话的“autobegin”进程使用相同的步骤序列开始事务。这包括 SessionEvents.after_transaction_create()
事件发生时调用事件;框架使用这个钩子,以便将自己的事务处理与ORM的事务处理集成 Session
.
从“子事务”模式迁移
1.4 版后已移除: 这个 Session.begin.subtransactions
标志已弃用。而 Session
仍然在内部使用“subtransactions”模式,它不适合最终用户使用,因为它会导致混乱,此外,它可能会从 Session
一旦“自动提交”模式被删除,它本身就处于2.0版中。
通常与autocommit模式一起使用的“subtransaction”模式在1.4中也被弃用。此模式允许使用 Session.begin()
方法,从而生成一个称为“subtransaction”的构造,该构造本质上是一个阻止 Session.commit()
方法。
这种模式在实际的应用程序中已经被证明是令人困惑的,对于应用程序来说,最好确保使用单个begin/commit对执行最高层的数据库操作。
为了为使用此模式的应用程序提供向后兼容性,可以使用以下上下文管理器或基于装饰器的类似实现::
import contextlib @contextlib.contextmanager def transaction(session): if not session.in_transaction(): with session.begin(): yield else: yield
上面的上下文管理器可以用与“subtransaction”标志相同的方式使用,例如在下面的示例中:
# method_a starts a transaction and calls method_b def method_a(session): with transaction(session): method_b(session) # method_b also starts a transaction, but when # called from method_a participates in the ongoing # transaction. def method_b(session): with transaction(session): session.add(SomeObject('bat', 'lala')) Session = sessionmaker(engine) # create a Session and call method_a with Session() as session: method_a(session)
为了与首选惯用模式进行比较,begin块应该位于最外层。这样就不需要单独的函数或方法来处理事务划分的细节:
def method_a(session): method_b(session) def method_b(session): session.add(SomeObject('bat', 'lala')) Session = sessionmaker(engine) # create a Session and call method_a with Session() as session: with session.begin(): method_a(session)
参见
从“嵌套”模式迁移 -仅基于核心的相似模式
启用两阶段提交
对于支持两阶段操作(目前是mysql和postgresql)的后端,可以指示会话使用两阶段提交语义。这将协调跨数据库提交事务,以便在所有数据库中提交或回滚事务。你也可以 Session.prepare()
用于与不由SqlAlchemy管理的事务交互的会话。要使用两阶段事务,请设置标志 twophase=True
会议内容:
engine1 = create_engine('postgresql://db1') engine2 = create_engine('postgresql://db2') Session = sessionmaker(twophase=True) # bind User operations to engine 1, Account operations to engine 2 Session.configure(binds={User:engine1, Account:engine2}) session = Session() # .... work with accounts and users # commit. session will issue a flush to all DBs, and a prepare step to all DBs, # before committing both transactions session.commit()
设置事务隔离级别/DBAPI AUTOCOMMIT
大多数dbapi支持可配置事务的概念 isolation 水平。这通常是四个级别“未提交读”、“已提交读”、“可重复读”和“可序列化”。它们通常在DBAPI连接开始一个新事务之前应用于它,注意大多数DBAPI将在SQL语句首次发出时隐式地开始该事务。
支持隔离级别的DBAPI通常也支持真正的“autocommit”概念,这意味着DBAPI连接本身将被置于非事务自动提交模式。这通常意味着,向数据库自动发出“BEGIN”的典型DBAPI行为不再发生,但也可能包含其他指令。使用此模式时, DBAPI在任何情况下都不使用事务 . SQLAlchemy方法,比如 .begin()
, .commit()
和 .rollback()
悄悄地过去。
SQLAlchemy的方言支持可设置的隔离模式 -Engine
或每 -Connection
基础,在 create_engine()
水平以及 Connection.execution_options()
水平。
使用ORM时 Session
它作为一个 外观 对于引擎和连接,但不直接公开事务隔离。因此,为了影响事务隔离级别,我们需要根据 Engine
或 Connection
适当时。
参见
设置事务隔离级别,包括DBAPI Autocommit -一定要检查隔离级别在SQLAlChemy级别上是如何工作的 Connection
对象也是如此。
为Sessionmaker/Engine范围设置隔离
建立一个 Session
或 sessionmaker
对于全局特定的隔离级别,第一种技术是 Engine
在所有情况下都可以针对特定的隔离级别构造,然后将其用作 Session
和/或 sessionmaker
::
from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker eng = create_engine( "postgresql://scott:tiger@localhost/test", isolation_level='REPEATABLE READ' ) Session = sessionmaker(eng)
另一个选择是,如果同时有两个具有不同隔离级别的发动机,则使用 Engine.execution_options()
方法,它将生成原始的浅拷贝 Engine
它与父引擎共享相同的连接池。当操作被分为“事务性”和“自动提交”操作时,这通常更可取:
from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker eng = create_engine("postgresql://scott:tiger@localhost/test") autocommit_engine = eng.execution_options(isolation_level="AUTOCOMMIT") transactional_session = sessionmaker(eng) autocommit_session = sessionmaker(autocommit_engine)
上面的“eng`”和 "autocommit_engine"
共享相同的方言和连接池。但是,当从获取连接时,“自动提交”模式将在连接上设置 autocommit_engine
. 两者 sessionmaker
对象“transactional_session”和“`autocommit_session”``然后在处理数据库连接时继承这些特征。
“自动提交会话” 继续具有事务语义 包括那个 Session.commit()
和 Session.rollback()
仍然认为自己是“提交”和“回滚”对象,但是事务将静默缺席。因此, it is typical, though not strictly required, that a Session with AUTOCOMMIT isolation be used in a read-only fashion ,即:
with autocommit_session() as session: some_objects = session.execute(<statement>) some_other_objects = session.execute(<statement>) # closes connection
为单个会话设置隔离
当我们创造一个新的 Session
,或者直接使用构造函数,或者当我们调用 sessionmaker
,我们可以通过 bind
参数,重写先前存在的绑定。例如,我们可以创建我们的 Session
从默认设置 sessionmaker
并传递自动提交的引擎集::
plain_engine = create_engine("postgresql://scott:tiger@localhost/test") autocommit_engine = plain_engine.execution_options(isolation_level="AUTOCOMMIT") # will normally use plain_engine Session = sessionmaker(plain_engine) # make a specific Session that will use the "autocommit" engine with Session(bind=autocommit_engine) as session: # work with session
对于这个案件 Session
或 sessionmaker
配置了多个“绑定”,我们可以重新指定 binds
完全变元,或者如果我们只想替换特定的绑定,则可以使用 Session.bind_mapper()
或 Session.bind_table()
方法::
with Session() as session: session.bind_mapper(User, autocommit_engine)
为单个事务设置隔离
关于隔离级别的一个关键警告是不能在 Connection
事务已经开始的地方。数据库无法更改正在进行的事务的隔离级别,并且一些dbapi和SQLAlchemy方言在这方面有不一致的行为。
因此,最好使用 Session
这是前端绑定到一个引擎与期望的隔离水平。但是,使用 Session.connection()
事务开始时的方法::
from sqlalchemy.orm import Session # assume session just constructed sess = Session(bind=engine) # call connection() with options before any other operations proceed. # this will procure a new connection from the bound engine and begin a real # database transaction. sess.connection(execution_options={'isolation_level': 'SERIALIZABLE'}) # ... work with session in SERIALIZABLE isolation level... # commit transaction. the connection is released # and reverted to its previous isolation level. sess.commit() # subsequent to commit() above, a new transaction may be begun if desired, # which will proceed with the previous default isolation level unless # it is set again.
在上面,我们首先生成一个 Session
使用构造函数或 sessionmaker
。然后,我们通过调用 Session.connection()
,它提供将在数据库级事务开始之前传递给连接的执行选项。事务继续使用此选定的隔离级别。事务完成后,在连接返回到连接池之前,连接上的隔离级别将重置为其默认值。
这个 Session.begin()
方法还可以用来开始 Session
级别事务;调用 Session.connection()
可用于设置每个连接事务隔离级别::
sess = Session(bind=engine) with sess.begin(): # call connection() with options before any other operations proceed. # this will procure a new connection from the bound engine and begin a # real database transaction. sess.connection(execution_options={'isolation_level': 'SERIALIZABLE'}) # ... work with session in SERIALIZABLE isolation level... # outside the block, the transaction has been committed. the connection is # released and reverted to its previous isolation level.
用事件跟踪事务状态
见剖面图 事务事件 有关会话事务状态更改的可用事件挂钩的概述。
将会话加入外部事务(如测试套件)
如果A Connection
正在使用已处于事务状态的(即 Transaction
A) Session
只需绑定 Session
到那个 Connection
. 这通常的基本原理是一个测试套件,它允许ORM代码与 Session
包括打电话的能力 Session.commit()
,然后将回滚整个数据库交互。
在 1.4 版更改: 本节介绍一个新版本的“jointoaexternaltransaction”方法,它对两者都同样有效 2.0 style 和 1.x style 引擎和会话。以前版本(如1.3)中的配方也将继续适用于1.x引擎和会话。
配方通过建立一个 Connection
在事务和可选的保存点中,然后将其传递给 Session
作为“绑定”。这个 Session
检测到给定的 Connection
已在事务中,并且如果该事务实际上是最外层的事务,则不会对该事务运行提交。然后,当测试停止时,事务将回滚,以便恢复整个测试过程中的任何数据更改:
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from unittest import TestCase # global application scope. create Session class, engine Session = sessionmaker() engine = create_engine('postgresql://...') class SomeTest(TestCase): def setUp(self): # connect to the database self.connection = engine.connect() # begin a non-ORM transaction self.trans = self.connection.begin() # bind an individual Session to the connection self.session = Session(bind=self.connection) ### optional ### # if the database supports SAVEPOINT (SQLite needs special # config for this to work), starting a savepoint # will allow tests to also use rollback within tests self.nested = self.connection.begin_nested() @event.listens_for(self.session, "after_transaction_end") def end_savepoint(session, transaction): if not self.nested.is_active: self.nested = self.connection.begin_nested() ### ^^^ optional ^^^ ### def test_something(self): # use the session in tests. self.session.add(Foo()) self.session.commit() def test_something_with_rollbacks(self): # if the SAVEPOINT steps are taken, then a test can also # use session.rollback() and continue working with the database self.session.add(Bar()) self.session.flush() self.session.rollback() self.session.add(Foo()) self.session.commit() def tearDown(self): self.session.close() # rollback - everything that happened with the # Session above (including calls to commit()) # is rolled back. self.trans.rollback() # return connection to the Engine self.connection.close()
上面的方法是SQLAlchemy自己的CI的一部分,以确保它保持预期的工作状态。