事务和连接管理

优质
小牛编辑
129浏览
2023-12-01

管理交易

在 1.4 版更改: 对会话事务管理进行了修改,使其更清晰、更易于使用。特别是,它现在具有“autobegin”操作的特点,这意味着事务开始的点可以被控制,而不必使用传统的“autocommit”模式。

这个 Session 一次跟踪单个“虚拟”事务的状态,使用一个名为 SessionTransaction 。然后,该对象利用基础的 Engine 或引擎,而这些引擎 Session 对象进行绑定,以便使用 Connection 对象。

或者在需要时使用“虚拟事务”自动启动 Session.begin() 方法。在很大程度上,Python上下文管理器的使用在创建级别上都得到了支持 Session 对象以及维护 SessionTransaction .

下面,假设我们从 Session ::

from sqlalchemy.orm import Session
session = Session(engine)

我们现在可以使用上下文管理器在已划分的事务中运行操作:

with session.begin():
    session.add(some_object())
    session.add(some_other_object())
# commits transaction at the end, or rolls back if there
# was an exception raised

在上述上下文的末尾,假设没有引发异常,任何挂起的对象都将刷新到数据库中,并提交数据库事务。如果在上述块中引发异常,则事务将回滚。在这两种情况下 Session 退出块之后,可以在后续事务中使用。

这个 Session.begin() 方法是可选的,并且 Session 也可以用在“按需提交”方法中,它将根据需要自动开始事务;这些事务只需要提交或回滚:

session = Session(engine)

session.add(some_object())
session.add(some_other_object())

session.commit()  # commits

# will automatically begin again
result = session.execute(< some select statement >)
session.add_all([more_objects, ...])
session.commit()  # commits

session.add(still_another_object)
session.flush()  # flush still_another_object
session.rollback()   # rolls back still_another_object

这个 Session 其特点是 Session.close() 方法。如果 Session 如果在尚未提交或回滚的事务中开始,则此方法将取消(即回滚)该事务,并且还将删除包含在 Session 对象的状态。如果 Session 正在以这样一种方式使用 Session.commit()Session.rollback() 不保证(例如,不在上下文管理器或类似工具中),则 close 方法可用于确保释放所有资源:

# expunges all objects, releases all transactions unconditionally
# (with rollback), releases all database connections back to their
# engines
session.close()

最后,会话构造/关闭过程本身可以通过上下文管理器运行。这是确保 Session 对象的使用范围在固定块内。通过 Session 先建设者:

with Session(engine) as session:
    session.add(some_object())
    session.add(some_other_object())

    session.commit()  # commits

    session.add(still_another_object)
    session.flush()  # flush still_another_object

    session.commit()  # commits

    result = session.execute(<some SELECT statement>)

# remaining transactional state from the .execute() call is
# discarded

同样, sessionmaker 可以用同样的方式使用:

Session = sesssionmaker(engine)

with Session() as session:
    with session.begin():
        session.add(some_object)
    # commits

# closes the Session

sessionmaker 包含自身 sessionmaker.begin() 方法以允许两个操作同时进行::

with Session.begin() as session:
    session.add(some_object):

使用保存点

如果基础引擎支持保存点事务,则可以使用 Session.begin_nested() 方法:

Session = sessionmaker()

with Session.begin() as session:
    session.add(u1)
    session.add(u2)

    nested = session.begin_nested() # establish a savepoint
    session.add(u3)
    nested.rollback()  # rolls back u3, keeps u1 and u2

# commits u1 and u2

每次 Session.begin_nested() 调用时,一个新的“BEGIN SAVEPOINT”命令将使用唯一标识符发送到数据库。什么时候 SessionTransaction.commit() 调用时,将在数据库上发出“Release SavePoint”,并且如果 SessionTransaction.rollback() 调用时,将发出“回滚到保存点”。

Session.begin_nested() 也可以用作上下文管理器,方式与 Session.begin() 方法:

for record in records:
    try:
        with session.begin_nested():
            session.merge(record)
    except:
        print("Skipped record %s" % record)
session.commit()

什么时候? Session.begin_nested() 被称为 Session.flush() 无条件发布(无论 autoflush 设置)。这样,当对该嵌套事务执行回滚时,会话的完整状态将过期,从而导致所有后续属性/实例访问引用 Session 就在之前 Session.begin_nested() 被叫来。

参见

NestedTransaction -The NestedTransaction 类是由 Session 在内部生成保存点块。

会话级与引擎级事务控制

从SQLAlchemy 1.4开始 sessionmaker 核心 Engine 两个对象都支持 2.0 style 通过利用 Session.future 标志以及 create_engine.future 使这两个对象采用2.0样式的语义。

当使用future模式时,两个包之间应该在 sessionmakerEngine 以及 SessionConnection . 以下部分将根据以下方案详细说明这些方案:

ORM (using future Session)                    Core (using future engine)
-----------------------------------------     -----------------------------------
sessionmaker                                  Engine
Session                                       Connection
sessionmaker.begin()                          Engine.begin()
some_session.commit()                         some_connection.commit()
with some_sessionmaker() as session:          with some_engine.connect() as conn:
with some_sessionmaker.begin() as session:    with some_engine.begin() as conn:
with some_session.begin_nested() as sp:       with some_connection.begin_nested() as sp:

边做边做

两个 SessionConnection 特征 Connection.commit()Connection.rollback() 方法。使用SQLAlchemy 2.0样式的操作,这些方法会影响 最外层 所有情况下的交易。

发动机:

engine = create_engine("postgresql://user:pass@host/dbname", future=True)

with engine.connect() as conn:
    conn.execute(
        some_table.insert(),
        [
            {"data": "some data one"},
            {"data": "some data two"},
            {"data": "some data three"}
        ]
    )
    conn.commit()

会议:

Session = sessionmaker(engine, future=True)

with Session() as session:
    session.add_all([
        SomeClass(data="some data one"),
        SomeClass(data="some data two"),
        SomeClass(data="some data three")
    ])
    session.commit()

开始一次

两个 sessionmakerEngine 特征a Engine.begin() 方法,该方法将获取一个新对象,用它来执行SQL语句(方法 SessionConnection ,然后返回将维护该对象的begin/commit/rollback上下文的上下文管理器。

发动机:

engine = create_engine("postgresql://user:pass@host/dbname", future=True)

with engine.begin() as conn:
    conn.execute(
        some_table.insert(),
        [
            {"data": "some data one"},
            {"data": "some data two"},
            {"data": "some data three"}
        ]
    )
# commits and closes automatically

会议:

Session = sessionmaker(engine, future=True)

with Session.begin() as session:
    session.add_all([
        SomeClass(data="some data one"),
        SomeClass(data="some data two"),
        SomeClass(data="some data three")
    ])
# commits and closes automatically

嵌套事务

当通过 Session.begin_nested()Connection.begin_nested() 方法,则返回的事务对象必须用于提交或回滚保存点。召唤 Session.commit()Connection.commit() 方法将始终提交 最外层 事务;这是SQLAlchemy 2.0特有的行为,与1.x系列相反。

发动机:

engine = create_engine("postgresql://user:pass@host/dbname", future=True)

with engine.begin() as conn:
    savepoint = conn.begin_nested()
    conn.execute(
        some_table.insert(),
        [
            {"data": "some data one"},
            {"data": "some data two"},
            {"data": "some data three"}
        ]
    )
    savepoint.commit()  # or rollback

# commits automatically

会议:

Session = sessionmaker(engine, future=True)

with Session.begin() as session:
    savepoint = session.begin_nested()
    session.add_all([
        SomeClass(data="some data one"),
        SomeClass(data="some data two"),
        SomeClass(data="some data three")
    ])
    savepoint.commit()  # or rollback
# commits automatically

显式开始

在 1.4 版更改: SQLAlchemy 1.4不推荐使用“自动提交模式”,它以前是通过使用 Session.autocommit 旗子。未来,一种允许使用 Session.begin() 方法是新的“autobegin”行为,因此当 Session 是第一次构造的,或者在前一个事务结束之后,在它开始一个新的事务之前。

有关从依赖begin()/commit()对嵌套的框架从“subtransaction”模式迁移的背景信息,请参阅下一节 从“子事务”模式迁移 .

这个 Session 特点是“自动启动”行为,这意味着一旦操作开始,它就可以确保 SessionTransaction 以跟踪正在进行的操作。此事务在以下情况下完成 Session.commit() 被称为。

通常需要控制“开始”操作发生的点,尤其是在框架集成中。为了适应这种情况 Session 使用“自动注册”策略,这样 Session.begin() 方法可以直接为 Session 尚未开始交易:

Session = sessionmaker(bind=engine)
session = Session()
session.begin()
try:
    item1 = session.query(Item).get(1)
    item2 = session.query(Item).get(2)
    item1.foo = 'bar'
    item2.bar = 'foo'
    session.commit()
except:
    session.rollback()
    raise

使用上下文管理器更习惯地调用上述模式:

Session = sessionmaker(bind=engine)
session = Session()
with session.begin():
    item1 = session.query(Item).get(1)
    item2 = session.query(Item).get(2)
    item1.foo = 'bar'
    item2.bar = 'foo'

这个 Session.begin() 方法和会话的“autobegin”进程使用相同的步骤序列开始事务。这包括 SessionEvents.after_transaction_create() 事件发生时调用事件;框架使用这个钩子,以便将自己的事务处理与ORM的事务处理集成 Session .

从“子事务”模式迁移

1.4 版后已移除: 这个 Session.begin.subtransactions 标志已弃用。而 Session 仍然在内部使用“subtransactions”模式,它不适合最终用户使用,因为它会导致混乱,此外,它可能会从 Session 一旦“自动提交”模式被删除,它本身就处于2.0版中。

通常与autocommit模式一起使用的“subtransaction”模式在1.4中也被弃用。此模式允许使用 Session.begin() 方法,从而生成一个称为“subtransaction”的构造,该构造本质上是一个阻止 Session.commit() 方法。

这种模式在实际的应用程序中已经被证明是令人困惑的,对于应用程序来说,最好确保使用单个begin/commit对执行最高层的数据库操作。

为了为使用此模式的应用程序提供向后兼容性,可以使用以下上下文管理器或基于装饰器的类似实现::

import contextlib

@contextlib.contextmanager
def transaction(session):
    if not session.in_transaction():
        with session.begin():
            yield
    else:
        yield

上面的上下文管理器可以用与“subtransaction”标志相同的方式使用,例如在下面的示例中:

# method_a starts a transaction and calls method_b
def method_a(session):
    with transaction(session):
        method_b(session)

# method_b also starts a transaction, but when
# called from method_a participates in the ongoing
# transaction.
def method_b(session):
    with transaction(session):
        session.add(SomeObject('bat', 'lala'))

Session = sessionmaker(engine)

# create a Session and call method_a
with Session() as session:
    method_a(session)

为了与首选惯用模式进行比较,begin块应该位于最外层。这样就不需要单独的函数或方法来处理事务划分的细节:

def method_a(session):
    method_b(session)

def method_b(session):
    session.add(SomeObject('bat', 'lala'))

Session = sessionmaker(engine)

# create a Session and call method_a
with Session() as session:
    with session.begin():
        method_a(session)

参见

从“嵌套”模式迁移 -仅基于核心的相似模式

启用两阶段提交

对于支持两阶段操作(目前是mysql和postgresql)的后端,可以指示会话使用两阶段提交语义。这将协调跨数据库提交事务,以便在所有数据库中提交或回滚事务。你也可以 Session.prepare() 用于与不由SqlAlchemy管理的事务交互的会话。要使用两阶段事务,请设置标志 twophase=True 会议内容:

engine1 = create_engine('postgresql://db1')
engine2 = create_engine('postgresql://db2')

Session = sessionmaker(twophase=True)

# bind User operations to engine 1, Account operations to engine 2
Session.configure(binds={User:engine1, Account:engine2})

session = Session()

# .... work with accounts and users

# commit.  session will issue a flush to all DBs, and a prepare step to all DBs,
# before committing both transactions
session.commit()

设置事务隔离级别/DBAPI AUTOCOMMIT

大多数dbapi支持可配置事务的概念 isolation 水平。这通常是四个级别“未提交读”、“已提交读”、“可重复读”和“可序列化”。它们通常在DBAPI连接开始一个新事务之前应用于它,注意大多数DBAPI将在SQL语句首次发出时隐式地开始该事务。

支持隔离级别的DBAPI通常也支持真正的“autocommit”概念,这意味着DBAPI连接本身将被置于非事务自动提交模式。这通常意味着,向数据库自动发出“BEGIN”的典型DBAPI行为不再发生,但也可能包含其他指令。使用此模式时, DBAPI在任何情况下都不使用事务 . SQLAlchemy方法,比如 .begin().commit().rollback() 悄悄地过去。

SQLAlchemy的方言支持可设置的隔离模式 -Engine 或每 -Connection 基础,在 create_engine() 水平以及 Connection.execution_options() 水平。

使用ORM时 Session 它作为一个 外观 对于引擎和连接,但不直接公开事务隔离。因此,为了影响事务隔离级别,我们需要根据 EngineConnection 适当时。

参见

设置事务隔离级别,包括DBAPI Autocommit -一定要检查隔离级别在SQLAlChemy级别上是如何工作的 Connection 对象也是如此。

为Sessionmaker/Engine范围设置隔离

建立一个 Sessionsessionmaker 对于全局特定的隔离级别,第一种技术是 Engine 在所有情况下都可以针对特定的隔离级别构造,然后将其用作 Session 和/或 sessionmaker ::

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

eng = create_engine(
    "postgresql://scott:tiger@localhost/test",
    isolation_level='REPEATABLE READ'
)

Session = sessionmaker(eng)

另一个选择是,如果同时有两个具有不同隔离级别的发动机,则使用 Engine.execution_options() 方法,它将生成原始的浅拷贝 Engine 它与父引擎共享相同的连接池。当操作被分为“事务性”和“自动提交”操作时,这通常更可取:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

eng = create_engine("postgresql://scott:tiger@localhost/test")

autocommit_engine = eng.execution_options(isolation_level="AUTOCOMMIT")

transactional_session = sessionmaker(eng)
autocommit_session = sessionmaker(autocommit_engine)

上面的“eng`”和 "autocommit_engine" 共享相同的方言和连接池。但是,当从获取连接时,“自动提交”模式将在连接上设置 autocommit_engine . 两者 sessionmaker 对象“transactional_session”和“`autocommit_session”``然后在处理数据库连接时继承这些特征。

“自动提交会话” 继续具有事务语义 包括那个 Session.commit()Session.rollback() 仍然认为自己是“提交”和“回滚”对象,但是事务将静默缺席。因此, it is typical, though not strictly required, that a Session with AUTOCOMMIT isolation be used in a read-only fashion ,即:

with autocommit_session() as session:
    some_objects = session.execute(<statement>)
    some_other_objects = session.execute(<statement>)

# closes connection

为单个会话设置隔离

当我们创造一个新的 Session ,或者直接使用构造函数,或者当我们调用 sessionmaker ,我们可以通过 bind 参数,重写先前存在的绑定。例如,我们可以创建我们的 Session 从默认设置 sessionmaker 并传递自动提交的引擎集::

plain_engine = create_engine("postgresql://scott:tiger@localhost/test")

autocommit_engine = plain_engine.execution_options(isolation_level="AUTOCOMMIT")

# will normally use plain_engine
Session = sessionmaker(plain_engine)

# make a specific Session that will use the "autocommit" engine
with Session(bind=autocommit_engine) as session:
    # work with session

对于这个案件 Sessionsessionmaker 配置了多个“绑定”,我们可以重新指定 binds 完全变元,或者如果我们只想替换特定的绑定,则可以使用 Session.bind_mapper()Session.bind_table() 方法::

with Session() as session:
    session.bind_mapper(User, autocommit_engine)

为单个事务设置隔离

关于隔离级别的一个关键警告是不能在 Connection 事务已经开始的地方。数据库无法更改正在进行的事务的隔离级别,并且一些dbapi和SQLAlchemy方言在这方面有不一致的行为。

因此,最好使用 Session 这是前端绑定到一个引擎与期望的隔离水平。但是,使用 Session.connection() 事务开始时的方法::

from sqlalchemy.orm import Session

# assume session just constructed
sess = Session(bind=engine)

# call connection() with options before any other operations proceed.
# this will procure a new connection from the bound engine and begin a real
# database transaction.
sess.connection(execution_options={'isolation_level': 'SERIALIZABLE'})

# ... work with session in SERIALIZABLE isolation level...

# commit transaction.  the connection is released
# and reverted to its previous isolation level.
sess.commit()

# subsequent to commit() above, a new transaction may be begun if desired,
# which will proceed with the previous default isolation level unless
# it is set again.

在上面,我们首先生成一个 Session 使用构造函数或 sessionmaker 。然后,我们通过调用 Session.connection() ,它提供将在数据库级事务开始之前传递给连接的执行选项。事务继续使用此选定的隔离级别。事务完成后,在连接返回到连接池之前,连接上的隔离级别将重置为其默认值。

这个 Session.begin() 方法还可以用来开始 Session 级别事务;调用 Session.connection() 可用于设置每个连接事务隔离级别::

sess = Session(bind=engine)

with sess.begin():
    # call connection() with options before any other operations proceed.
    # this will procure a new connection from the bound engine and begin a
    # real database transaction.
    sess.connection(execution_options={'isolation_level': 'SERIALIZABLE'})

    # ... work with session in SERIALIZABLE isolation level...

# outside the block, the transaction has been committed.  the connection is
# released and reverted to its previous isolation level.

用事件跟踪事务状态

见剖面图 事务事件 有关会话事务状态更改的可用事件挂钩的概述。

将会话加入外部事务(如测试套件)

如果A Connection 正在使用已处于事务状态的(即 Transaction A) Session 只需绑定 Session 到那个 Connection . 这通常的基本原理是一个测试套件,它允许ORM代码与 Session 包括打电话的能力 Session.commit() ,然后将回滚整个数据库交互。

在 1.4 版更改: 本节介绍一个新版本的“jointoaexternaltransaction”方法,它对两者都同样有效 2.0 style1.x style 引擎和会话。以前版本(如1.3)中的配方也将继续适用于1.x引擎和会话。

配方通过建立一个 Connection 在事务和可选的保存点中,然后将其传递给 Session 作为“绑定”。这个 Session 检测到给定的 Connection 已在事务中,并且如果该事务实际上是最外层的事务,则不会对该事务运行提交。然后,当测试停止时,事务将回滚,以便恢复整个测试过程中的任何数据更改:

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from unittest import TestCase

# global application scope.  create Session class, engine
Session = sessionmaker()

engine = create_engine('postgresql://...')

class SomeTest(TestCase):
    def setUp(self):
        # connect to the database
        self.connection = engine.connect()

        # begin a non-ORM transaction
        self.trans = self.connection.begin()


        # bind an individual Session to the connection
        self.session = Session(bind=self.connection)


        ###    optional     ###

        # if the database supports SAVEPOINT (SQLite needs special
        # config for this to work), starting a savepoint
        # will allow tests to also use rollback within tests

        self.nested = self.connection.begin_nested()

        @event.listens_for(self.session, "after_transaction_end")
        def end_savepoint(session, transaction):
            if not self.nested.is_active:
                self.nested = self.connection.begin_nested()

        ### ^^^ optional ^^^ ###

    def test_something(self):
        # use the session in tests.

        self.session.add(Foo())
        self.session.commit()

    def test_something_with_rollbacks(self):
        # if the SAVEPOINT steps are taken, then a test can also
        # use session.rollback() and continue working with the database

        self.session.add(Bar())
        self.session.flush()
        self.session.rollback()

        self.session.add(Foo())
        self.session.commit()

    def tearDown(self):
        self.session.close()

        # rollback - everything that happened with the
        # Session above (including calls to commit())
        # is rolled back.
        self.trans.rollback()

        # return connection to the Engine
        self.connection.close()

上面的方法是SQLAlchemy自己的CI的一部分,以确保它保持预期的工作状态。