使用事件跟踪查询、对象和会话更改

优质
小牛编辑
137浏览
2023-12-01

SQL炼金术的特点是 Event Listening 整个核心和ORM使用的系统。在ORM中,有各种各样的事件侦听器钩子,这些钩子在API级别上记录在 ORM事件 . 这些年来,这些活动的集合不断增加,包括许多非常有用的新活动,以及一些与以前不太相关的旧活动。本节将尝试介绍主要事件挂钩以及它们可能被使用的时间。

执行事件

1.4 新版功能: 这个 Session 现在有一个完整的钩子,用来拦截代表ORM的所有SELECT语句以及批量更新和删除语句。这个钩子取代了前一个钩子 QueryEvents.before_compile() 事件也是如此 QueryEvents.before_compile_update()QueryEvents.before_compile_delete() .

Session 提供了一个全面的系统,通过该系统,所有查询都可以通过 Session.execute() 方法,其中包括由发出的所有SELECT语句 Query 以及代表列和关系加载器发出的所有SELECT语句都可能被截获和修改。系统利用 SessionEvents.do_orm_execute() 事件钩子以及 ORMExecuteState 对象来表示事件状态。

基本查询拦截

SessionEvents.do_orm_execute() 它首先用于任何类型的查询拦截,包括 Query 具有 1.x style 以及启用ORM时 2.0 style select()update()delete() 构造交付到 Session.execute() . 这个 ORMExecuteState 构造提供访问器以允许对语句、参数和选项进行修改:

Session = sessionmaker(engine, future=True)

@event.listens_for(Session, "do_orm_execute")
def _do_orm_execute(orm_execute_state):
    if orm_execute_state.is_select:
        # add populate_existing for all SELECT statements

        orm_execute_state.update_execution_options(populate_existing=True)

        # check if the SELECT is against a certain entity and add an
        # ORDER BY if so
        col_descriptions = orm_execute_state.statement.column_descriptions

        if col_descriptions[0]['entity'] is MyEntity:
            orm_execute_state.statement = statement.order_by(MyEntity.name)

上面的示例演示了对SELECT语句的一些简单修改。在这个层面上 SessionEvents.do_orm_execute() 事件钩子打算替换以前使用的 QueryEvents.before_compile() 事件,对于不同类型的装载机,它不是一致地触发的;此外 QueryEvents.before_compile() 仅适用于 1.x style 与一起使用 Query 而不是 2.0 style 使用 Session.execute() .

添加全局WHERE/ON条件

最需要的查询扩展功能之一是能够将WHERE条件添加到所有查询中实体的所有出现处。这可以通过利用 with_loader_criteria() 查询选项,它可以单独使用,或者非常适合在 SessionEvents.do_orm_execute() 事件:

from sqlalchemy.orm import with_loader_criteria

Session = sessionmaker(engine, future=True)

@event.listens_for(Session, "do_orm_execute")
def _do_orm_execute(orm_execute_state):

    if (
        orm_execute_state.is_select and
        not orm_execute_state.is_column_load and
        not orm_execute_state.is_relationship_load
    ):
        orm_execute_state.statement = orm_execute_state.statement.options(
            with_loader_criteria(MyEntity.public == True)
        )

上面,向所有SELECT语句添加了一个选项,该选项将限制所有查询 MyEntity 到过滤上 public == True 。该标准将应用于 all 立即查询范围内的该类加载。这个 with_loader_criteria() 默认情况下,选项也会自动传播到关系加载器,这将应用于后续的关系加载,包括惰性加载、选择加载等。

对于一系列都具有某些公共列结构的类,如果这些类是使用 declarative mixin ,mixin类本身可以与 with_loader_criteria() 通过使用Python lambda。Python lambda将在查询编译时针对符合条件的特定实体调用。给定一系列基于mixin的类 HasTimestamp ::

import datetime

class HasTimestamp(object):
    timestamp = Column(DateTime, default=datetime.datetime.now)


class SomeEntity(HasTimestamp, Base):
    __tablename__ = "some_entity"
    id = Column(Integer, primary_key=True)

class SomeOtherEntity(HasTimestamp, Base):
    __tablename__ = "some_entity"
    id = Column(Integer, primary_key=True)

以上等级 SomeEntitySomeOtherEntity 每个人都会有一个专栏吗 timestamp 默认为当前日期和时间。事件可用于截获从 HasTimestamp 过滤他们的 timestamp 不早于一个月前的日期的列:

@event.listens_for(Session, "do_orm_execute")
def _do_orm_execute(orm_execute_state):
    if (
            orm_execute_state.is_select
            and not orm_execute_state.is_column_load
            and not orm_execute_state.is_relationship_load
    ):
        one_month_ago = datetime.datetime.today() - datetime.timedelta(months=1)

        orm_execute_state.statement = orm_execute_state.statement.options(
            with_loader_criteria(
                HasTimestamp,
                lambda cls: cls.timestamp >= one_month_ago,
                include_aliases=True
            )
        )

警告

在调用中使用lambda来 with_loader_criteria() 仅调用 每个唯一类一次 。不应在此lambda内调用自定义函数。看见 使用Lambdas为语句生成添加显著的速度增益 获取“lambda SQL”特性的概述,该特性仅供高级使用。

参见

ORM查询事件 -包括上述工作示例 with_loader_criteria() 食谱。

重新执行语句

Deep Alchemy

语句重新执行特性涉及一个稍微复杂的递归序列,旨在解决将SQL语句的执行重新路由到各种非SQL上下文中这一相当困难的问题。下面链接的“dogpile缓存”和“水平分片”这两个例子,应该作为一个指南,说明何时应该使用这个相当高级的特性。

这个 ORMExecuteState 能够控制给定语句的执行;这包括完全不调用语句的能力,允许返回从缓存中检索到的预先构造的结果集,以及能够以不同的状态重复调用同一语句,例如针对多个数据库调用它连接,然后将结果合并到内存中。这两种高级模式都在SQLAlchemy的示例套件中进行了演示,详情如下。

在里面的时候 SessionEvents.do_orm_execute() 事件钩子 ORMExecuteState.invoke_statement() 方法可用于使用新的嵌套调用调用 Session.execute() 然后返回当前执行的进程,而不是返回当前的执行 Result 由内部执行返回。到目前为止为 SessionEvents.do_orm_execute() 此进程中的钩子也将在此嵌套调用中跳过。

这个 ORMExecuteState.invoke_statement() 方法返回 Result 对象;此对象的功能是将其“冻结”为可缓存格式,并“解冻”为新的 Result 对象,以及将其数据与其他对象的数据合并 Result 物体。

E、 g.,使用 SessionEvents.do_orm_execute() 要实现缓存:

from sqlalchemy.orm import loading

cache = {}

@event.listens_for(Session, "do_orm_execute")
def _do_orm_execute(orm_execute_state):
    if "my_cache_key" in orm_execute_state.execution_options:
        cache_key = orm_execute_state.execution_options["my_cache_key"]

        if cache_key in cache:
            frozen_result = cache[cache_key]
        else:
            frozen_result = orm_execute_state.invoke_statement().freeze()
            cache[cache_key] = frozen_result

        return loading.merge_frozen_result(
            orm_execute_state.session,
            orm_execute_state.statement,
            frozen_result,
            load=False,
        )

有了上面的钩子,使用缓存的示例如下所示:

stmt = select(User).where(User.name == 'sandy').execution_options(my_cache_key="key_sandy")

result = session.execute(stmt)

在上面,一个自定义的执行选项被传递给 Select.execution_options() 为了建立一个“缓存密钥”,它将被 SessionEvents.do_orm_execute() 钩子。然后将此缓存键匹配到 FrozenResult 缓存中可能存在的对象,如果存在,则重用该对象。这个食谱利用了 Result.freeze() “冻结”a的方法 Result 对象,上面将包含ORM结果,因此它可以存储在缓存中并多次使用。为了从“冻结”结果返回实时结果 merge_frozen_result() 函数用于将结果对象中的“冻结”数据合并到当前会话中。

上面的示例作为完整的示例在中实现 狗堆缓存 .

这个 ORMExecuteState.invoke_statement() 方法也可以多次调用,将不同的信息传递给 ORMExecuteState.invoke_statement.bind_arguments 参数使 Session 会利用不同的 Engine 每次对象。这将返回一个不同的 Result 对象;这些结果可以使用 Result.merge() 方法。这是 水平切分 扩展;请参阅源代码以熟悉。

参见

狗堆缓存

水平切分

持久性事件

可能最广泛使用的一系列事件是“持久性”事件,它对应于 flush process . 刷新是指对对象的挂起更改做出所有决定,然后以插入、更新和删除语句的形式发送到数据库。

before_flush()

这个 SessionEvents.before_flush() 钩子是到目前为止最常用的事件,当应用程序希望确保在刷新进行时对数据库进行额外的持久性更改时,可以使用钩子。使用 SessionEvents.before_flush() 以便对对象进行操作以验证其状态,并在持久化之前组合其他对象和引用。在这次活动中, 可以安全地操纵会话的状态 也就是说,可以附加新对象,删除对象,自由更改对象上的单个属性,这些更改将在事件挂钩完成时拉入刷新过程。

典型 SessionEvents.before_flush() Hook将负责扫描集合 Session.newSession.dirtySession.deleted 以寻找将要发生事情的对象。

用于说明 SessionEvents.before_flush() ,参见示例,例如 使用历史记录表进行版本控制使用临时行进行版本控制 .

after_flush()

这个 SessionEvents.after_flush() 在为刷新进程发出SQL之后调用hook,但是 之前 刷新对象的状态已更改。也就是说,你仍然可以检查 Session.newSession.dirtySession.deleted 集合以查看刚刚刷新的内容,还可以使用历史跟踪功能,如 AttributeState 以查看刚刚持续的更改。在 SessionEvents.after_flush() 事件,可以根据观察到的变化向数据库发出额外的SQL。

after_flush_postexec()

SessionEvents.after_flush_postexec() 很快就要打电话了 SessionEvents.after_flush() ,但被调用 之后 对象的状态已被修改,以解释刚刚发生的刷新。这个 Session.newSession.dirtySession.deleted 这里的集合通常是完全空的。使用 SessionEvents.after_flush_postexec() 以检查已完成对象的标识映射,并可能发出其他SQL。在这个钩子中,有能力对对象进行新的更改,这意味着 Session 将再次进入“肮脏”状态;机械的 Session 这会使它冲洗 再一次 如果在此钩子中检测到新的更改,则在 Session.commit() 否则,挂起的更改将作为下一个正常刷新的一部分捆绑在一起。当钩子检测到 Session.commit() ,计数器确保在100次迭代后停止这方面的无限循环,如果 SessionEvents.after_flush_postexec() 钩子不断地添加每次调用时要刷新的新状态。

映射器级别事件

除了齐平级别的钩子之外,还有一组更细粒度的钩子,因为它们是基于每个对象调用的,并且是基于插入、更新或删除而断开的。这些是映射器持久性挂钩,它们也非常流行,但是需要更谨慎地处理这些事件,因为它们在已经在进行的刷新过程的上下文中进行;许多操作在这里进行并不安全。

事件是:

每个事件都通过 Mapper ,映射对象本身,以及 Connection 它正用于发出INSERT、UPDATE或DELETE语句。这些事件的吸引力很明显,因为如果应用程序希望将某些活动与某个特定类型的对象通过插入持久化时相关联,那么钩子是非常特定的;与 SessionEvents.before_flush() 事件,不需要像 Session.new 为了找到目标。但是,表示要发出的每个insert、update、delete语句的完整列表的flush计划已经 已经决定了 当调用这些事件时,在此阶段不能进行任何更改。因此,对给定对象的唯一可能更改是基于属性 地方的 到对象的行。对对象或其他对象的任何其他更改都将影响 Session 将无法正常工作。

这些映射器级别的持久性事件不支持的操作包括:

  • Session.add()

  • Session.delete()

  • 映射集合追加、添加、删除、删除、放弃等。

  • 映射关系属性set/del事件,即 someobject.related = someotherobject

原因 Connection 通过是鼓励 这里进行简单的SQL操作 ,直接在 Connection 例如递增计数器或在日志表中插入额外的行。当处理 Connection ,预计将使用核心级别的SQL操作;例如 SQL表达式语言教程(1.x API) .

还有许多每个对象的操作根本不需要在刷新事件中处理。最常见的选择是简单地在对象内部建立附加状态 __init__() 方法,例如创建要与新对象关联的其他对象。如中所述使用验证器 简单验证器 是另一种方法;这些函数可以截获对属性的更改,并在目标对象上建立响应属性更改的附加状态更改。使用这两种方法,对象在到达刷新步骤之前处于正确的状态。

对象生命周期事件

事件的另一个用例是跟踪对象的生命周期。这是指在 Quickie对象状态简介 .

1.1 新版功能: 添加了一个事件系统,该系统截获对象在 Session .

以上所有状态都可以通过事件完全跟踪。每个事件都代表一个不同的状态转换,这意味着开始状态和目标状态都是被跟踪的部分。除初始瞬态事件外,所有事件都是 Session 对象或类,这意味着它们可以与特定的 Session 对象:

from sqlalchemy import event
from sqlalchemy.orm import Session

session = Session()

@event.listens_for(session, 'transient_to_pending')
def object_is_pending(session, obj):
    print("new pending: %s" % obj)

或与 Session 类本身,以及 sessionmaker ,这可能是最有用的形式:

from sqlalchemy import event
from sqlalchemy.orm import sessionmaker

maker = sessionmaker()

@event.listens_for(maker, 'transient_to_pending')
def object_is_pending(session, obj):
    print("new pending: %s" % obj)

当然,侦听器可以堆叠在一个函数之上,这很可能是常见的。例如,要跟踪所有进入持久状态的对象:

@event.listens_for(maker, "pending_to_persistent")
@event.listens_for(maker, "deleted_to_persistent")
@event.listens_for(maker, "detached_to_persistent")
@event.listens_for(maker, "loaded_as_persistent")
def detect_all_persistent(session, instance):
    print("object is now persistent: %s" % instance)

瞬态

第一次构造时的所有映射对象都以 transient . 在此状态下,对象单独存在,并且不与任何 Session . 对于这个初始状态,没有特定的“转换”事件,因为没有 Session 但是,如果要在创建任何瞬态对象时拦截,则 InstanceEvents.init() 方法可能是最好的事件。此事件应用于特定的类或超类。例如,要截获特定声明性基的所有新对象:

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import event

Base = declarative_base()

@event.listens_for(Base, "init", propagate=True)
def intercept_init(instance, args, kwargs):
    print("new transient: %s" % instance)

过渡到挂起

瞬变物体变成 pending 当它第一次与 Session 通过 Session.add()Session.add_all() 方法。对象也可能成为 Session 由于 "cascade" 从显式添加的引用对象。使用 SessionEvents.transient_to_pending() 事件:

@event.listens_for(sessionmaker, "transient_to_pending")
def intercept_transient_to_pending(session, object_):
    print("transient to pending: %s" % object_)

挂起到永久

这个 pending 对象变为 persistent 当刷新进行并且实例发生insert语句时。对象现在有一个标识密钥。跟踪挂起到永久 SessionEvents.pending_to_persistent() 事件:

@event.listens_for(sessionmaker, "pending_to_persistent")
def intercept_pending_to_persistent(session, object_):
    print("pending to persistent: %s" % object_)

等待过渡

这个 pending 对象可以还原为 transient 如果 Session.rollback() 方法在刷新挂起对象之前调用,或者如果 Session.expunge() 在刷新对象之前为其调用方法。跟踪待处理到瞬态 SessionEvents.pending_to_transient() 事件:

@event.listens_for(sessionmaker, "pending_to_transient")
def intercept_pending_to_transient(session, object_):
    print("transient to pending: %s" % object_)

作为持久加载

对象可以出现在 Session 直接在 persistent 从数据库加载时的状态。跟踪此状态转换与加载时跟踪对象同义,并且与使用 InstanceEvents.load() 实例级事件。然而, SessionEvents.loaded_as_persistent() 事件作为以会话为中心的钩子提供,用于拦截通过此特定路径进入持久状态的对象:

@event.listens_for(sessionmaker, "loaded_as_persistent")
def intercept_loaded_as_persistent(session, object_):
    print("object loaded into persistent state: %s" % object_)

持续到短暂

如果 Session.rollback() 方法是为一个事务调用的,在该事务中,首先将对象添加为挂起的。在回滚的情况下,使该对象持久化的insert语句将回滚,并从 Session 再次变成短暂的。使用 SessionEvents.persistent_to_transient() 事件钩子::

@event.listens_for(sessionmaker, "persistent_to_transient")
def intercept_persistent_to_transient(session, object_):
    print("persistent to transient: %s" % object_)

持续到已删除

持久对象进入 deleted 在刷新过程中从数据库中删除标记为删除的对象时的状态。注意这是 不一样 正如当 Session.delete() 为目标对象调用方法。这个 Session.delete() 仅方法 标志 要删除的对象;实际的DELETE语句在刷新进行之前不会发出。刷新之后,目标对象将出现“已删除”状态。

在“已删除”状态下,对象仅与 Session . 它不存在于身份图中,也不存在于 Session.deleted 在挂起删除时引用的集合。

从“已删除”状态,对象可以在提交事务时转到分离状态,或者在回滚事务时回到持久状态。

使用跟踪持续到已删除的转换 SessionEvents.persistent_to_deleted() ::

@event.listens_for(sessionmaker, "persistent_to_deleted")
def intercept_persistent_to_deleted(session, object_):
    print("object was DELETEd, is now in deleted state: %s" % object_)

已删除到已分离

删除的对象变为 detached 当会话的事务被提交时。后 Session.commit() 方法,数据库事务是最终的,并且 Session 现在完全丢弃已删除的对象并删除与之相关的所有关联。使用跟踪已删除到已分离的转换 SessionEvents.deleted_to_detached() ::

@event.listens_for(sessionmaker, "deleted_to_detached")
def intercept_deleted_to_detached(session, object_):
    print("deleted to detached: %s" % object_)

注解

当对象处于已删除状态时, InstanceState.deleted 属性,可使用访问 inspect(object).deleted ,返回true。然而,当物体被分离时, InstanceState.deleted 将再次返回false。要检测对象是否已被删除,无论是否已分离,请使用 InstanceState.was_deleted 访问器。

坚持分离

持久对象变成 detached 当对象与 Session ,通过 Session.expunge()Session.expunge_all()Session.close() 方法。

注解

一个物体也可能变成 隐式分离 如果拥有 Session 被应用程序取消引用并由于垃圾收集而丢弃。在这种情况下, 未发出任何事件 .

使用 SessionEvents.persistent_to_detached() 事件:

@event.listens_for(sessionmaker, "persistent_to_detached")
def intercept_persistent_to_detached(session, object_):
    print("object became detached: %s" % object_)

分离到持久

当分离的对象与使用 Session.add() 或等效方法。使用 SessionEvents.detached_to_persistent() 事件:

@event.listens_for(sessionmaker, "detached_to_persistent")
def intercept_detached_to_persistent(session, object_):
    print("object became persistent again: %s" % object_)

删除到永久

这个 deleted 对象可以还原为 persistent 状态何时使用 Session.rollback() 方法。使用 SessionEvents.deleted_to_persistent() 事件:

@event.listens_for(sessionmaker, "deleted_to_persistent")
def intercept_deleted_to_persistent(session, object_):
    print("deleted to persistent: %s" % object_)

事务事件

事务事件允许在事务边界出现时通知应用程序 Session 水平以及何时 Session 更改上的事务状态 Connection 物体。

属性更改事件

属性更改事件允许在修改对象上的特定属性时拦截。这些活动包括 AttributeEvents.set()AttributeEvents.append()AttributeEvents.remove() . 这些事件非常有用,特别是对于每个对象的验证操作;但是,使用“验证器”钩子(在后台使用这些钩子)通常更方便;请参见 简单验证器 关于这方面的背景。属性事件也是后向引用机制的背后。演示属性事件使用的示例在 属性检测 .