examples.versioned_rows.versioned_map
优质
小牛编辑
135浏览
2023-12-01
"""A variant of the versioned_rows example built around the concept of a "vertical table" structure, like those illustrated in :ref:`examples_vertical_tables` examples. Here we store a dictionary of key/value pairs, storing the k/v's in a "vertical" fashion where each key gets a row. The value is split out into two separate datatypes, string and int - the range of datatype storage can be adjusted for individual needs. Changes to the "data" attribute of a ConfigData object result in the ConfigData object being copied into a new one, and new associations to its data are created. Values which aren't changed between versions are referenced by both the former and the newer ConfigData object. Overall, only INSERT statements are emitted - no rows are UPDATed or DELETEd. An optional feature is also illustrated which associates individual key/value pairs with the ConfigData object in which it first originated. Since a new row is only persisted when a new value is created for a particular key, the recipe provides a way to query among the full series of changes which occurred for any particular key in the dictionary. The set of all ConfigData in a particular table represents a single series of versions. By adding additional columns to ConfigData, the system can be made to store multiple version streams distinguished by those additional values. """ from sqlalchemy import Column from sqlalchemy import create_engine from sqlalchemy import event from sqlalchemy import ForeignKey from sqlalchemy import Integer from sqlalchemy import String from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import attributes from sqlalchemy.orm import backref from sqlalchemy.orm import make_transient from sqlalchemy.orm import relationship from sqlalchemy.orm import Session from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import validates from sqlalchemy.orm.collections import attribute_mapped_collection @event.listens_for(Session, "before_flush") def before_flush(session, flush_context, instances): """Apply the new_version() method of objects which are marked as dirty during a flush. """ for instance in session.dirty: if hasattr(instance, "new_version") and session.is_modified(instance): # make it transient instance.new_version(session) # re-add session.add(instance) Base = declarative_base() class ConfigData(Base): """Represent a series of key/value pairs. ConfigData will generate a new version of itself upon change. The "data" dictionary provides access via string name mapped to a string/int value. """ __tablename__ = "config" id = Column(Integer, primary_key=True) """Primary key column of this ConfigData.""" elements = relationship( "ConfigValueAssociation", collection_class=attribute_mapped_collection("name"), backref=backref("config_data"), lazy="subquery", ) """Dictionary-backed collection of ConfigValueAssociation objects, keyed to the name of the associated ConfigValue. Note there's no "cascade" here. ConfigValueAssociation objects are never deleted or changed. """ def _new_value(name, value): """Create a new entry for usage in the 'elements' dictionary.""" return ConfigValueAssociation(ConfigValue(name, value)) data = association_proxy("elements", "value", creator=_new_value) """Proxy to the 'value' elements of each related ConfigValue, via the 'elements' dictionary. """ def __init__(self, data): self.data = data @validates("elements") def _associate_with_element(self, key, element): """Associate incoming ConfigValues with this ConfigData, if not already associated. This is an optional feature which allows more comprehensive history tracking. """ if element.config_value.originating_config is None: element.config_value.originating_config = self return element def new_version(self, session): # convert to an INSERT make_transient(self) self.id = None # history of the 'elements' collection. # this is a tuple of groups: (added, unchanged, deleted) hist = attributes.get_history(self, "elements") # rewrite the 'elements' collection # from scratch, removing all history attributes.set_committed_value(self, "elements", {}) # new elements in the "added" group # are moved to our new collection. for elem in hist.added: self.elements[elem.name] = elem # copy elements in the 'unchanged' group. # the new ones associate with the new ConfigData, # the old ones stay associated with the old ConfigData for elem in hist.unchanged: self.elements[elem.name] = ConfigValueAssociation( elem.config_value ) # we also need to expire changes on each ConfigValueAssociation # that is to remain associated with the old ConfigData. # Here, each one takes care of that in its new_version() # method, though we could do that here as well. class ConfigValueAssociation(Base): """Relate ConfigData objects to associated ConfigValue objects.""" __tablename__ = "config_value_association" config_id = Column(ForeignKey("config.id"), primary_key=True) """Reference the primary key of the ConfigData object.""" config_value_id = Column(ForeignKey("config_value.id"), primary_key=True) """Reference the primary key of the ConfigValue object.""" config_value = relationship("ConfigValue", lazy="joined", innerjoin=True) """Reference the related ConfigValue object.""" def __init__(self, config_value): self.config_value = config_value def new_version(self, session): """Expire all pending state, as ConfigValueAssociation is immutable.""" session.expire(self) @property def name(self): return self.config_value.name @property def value(self): return self.config_value.value @value.setter def value(self, value): """Intercept set events. Create a new ConfigValueAssociation upon change, replacing this one in the parent ConfigData's dictionary. If no net change, do nothing. """ if value != self.config_value.value: self.config_data.elements[self.name] = ConfigValueAssociation( ConfigValue(self.config_value.name, value) ) class ConfigValue(Base): """Represent an individual key/value pair at a given point in time. ConfigValue is immutable. """ __tablename__ = "config_value" id = Column(Integer, primary_key=True) name = Column(String(50), nullable=False) originating_config_id = Column( Integer, ForeignKey("config.id"), nullable=False ) int_value = Column(Integer) string_value = Column(String(255)) def __init__(self, name, value): self.name = name self.value = value originating_config = relationship("ConfigData") """Reference to the originating ConfigData. This is optional, and allows history tracking of individual values. """ def new_version(self, session): raise NotImplementedError("ConfigValue is immutable.") @property def value(self): for k in ("int_value", "string_value"): v = getattr(self, k) if v is not None: return v else: return None @value.setter def value(self, value): if isinstance(value, int): self.int_value = value self.string_value = None else: self.string_value = str(value) self.int_value = None if __name__ == "__main__": engine = create_engine("sqlite://", echo=True) Base.metadata.create_all(engine) Session = sessionmaker(engine) sess = Session() config = ConfigData( {"user_name": "twitter", "hash_id": "4fedffca37eaf", "x": 27, "y": 450} ) sess.add(config) sess.commit() version_one = config.id config.data["user_name"] = "yahoo" sess.commit() version_two = config.id assert version_one != version_two # two versions have been created. assert config.data == { "user_name": "yahoo", "hash_id": "4fedffca37eaf", "x": 27, "y": 450, } old_config = sess.query(ConfigData).get(version_one) assert old_config.data == { "user_name": "twitter", "hash_id": "4fedffca37eaf", "x": 27, "y": 450, } # the history of any key can be acquired using # the originating_config_id attribute history = ( sess.query(ConfigValue) .filter(ConfigValue.name == "user_name") .order_by(ConfigValue.originating_config_id) .all() ) assert [(h.value, h.originating_config_id) for h in history] == ( [("twitter", version_one), ("yahoo", version_two)] )