examples.performance.single_inserts

优质
小牛编辑
139浏览
2023-12-01
"""In this series of tests, we're looking at a method that inserts a row
within a distinct transaction, and afterwards returns to essentially a
"closed" state.   This would be analogous to an API call that starts up
a database connection, inserts the row, commits and closes.

"""
from sqlalchemy import bindparam
from sqlalchemy import Column
from sqlalchemy import create_engine
from sqlalchemy import Integer
from sqlalchemy import pool
from sqlalchemy import String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session
from . import Profiler


Base = declarative_base()
engine = None


class Customer(Base):
    __tablename__ = "customer"
    id = Column(Integer, primary_key=True)
    name = Column(String(255))
    description = Column(String(255))


Profiler.init("single_inserts", num=10000)


@Profiler.setup
def setup_database(dburl, echo, num):
    global engine
    engine = create_engine(dburl, echo=echo)
    if engine.dialect.name == "sqlite":
        engine.pool = pool.StaticPool(creator=engine.pool._creator)
    Base.metadata.drop_all(engine)
    Base.metadata.create_all(engine)


@Profiler.profile
def test_orm_commit(n):
    """Individual INSERT/COMMIT pairs via the ORM"""

    for i in range(n):
        session = Session(bind=engine)
        session.add(
            Customer(
                name="customer name %d" % i,
                description="customer description %d" % i,
            )
        )
        session.commit()


@Profiler.profile
def test_bulk_save(n):
    """Individual INSERT/COMMIT pairs using the "bulk" API"""

    for i in range(n):
        session = Session(bind=engine)
        session.bulk_save_objects(
            [
                Customer(
                    name="customer name %d" % i,
                    description="customer description %d" % i,
                )
            ]
        )
        session.commit()


@Profiler.profile
def test_bulk_insert_dictionaries(n):
    """Individual INSERT/COMMIT pairs using the "bulk" API with dictionaries"""

    for i in range(n):
        session = Session(bind=engine)
        session.bulk_insert_mappings(
            Customer,
            [
                dict(
                    name="customer name %d" % i,
                    description="customer description %d" % i,
                )
            ],
        )
        session.commit()


@Profiler.profile
def test_core(n):
    """Individual INSERT/COMMIT pairs using Core."""

    for i in range(n):
        with engine.begin() as conn:
            conn.execute(
                Customer.__table__.insert(),
                dict(
                    name="customer name %d" % i,
                    description="customer description %d" % i,
                ),
            )


@Profiler.profile
def test_core_query_caching(n):
    """Individual INSERT/COMMIT pairs using Core with query caching"""

    cache = {}
    ins = Customer.__table__.insert()
    for i in range(n):
        with engine.begin() as conn:
            conn.execution_options(compiled_cache=cache).execute(
                ins,
                dict(
                    name="customer name %d" % i,
                    description="customer description %d" % i,
                ),
            )


@Profiler.profile
def test_dbapi_raw_w_connect(n):
    """Individual INSERT/COMMIT pairs w/ DBAPI + connection each time"""

    _test_dbapi_raw(n, True)


@Profiler.profile
def test_dbapi_raw_w_pool(n):
    """Individual INSERT/COMMIT pairs w/ DBAPI + connection pool"""

    _test_dbapi_raw(n, False)


def _test_dbapi_raw(n, connect):
    compiled = (
        Customer.__table__.insert()
        .values(name=bindparam("name"), description=bindparam("description"))
        .compile(dialect=engine.dialect)
    )

    if compiled.positional:
        args = (
            ("customer name %d" % i, "customer description %d" % i)
            for i in range(n)
        )
    else:
        args = (
            dict(
                name="customer name %d" % i,
                description="customer description %d" % i,
            )
            for i in range(n)
        )
    sql = str(compiled)

    if connect:
        for arg in args:
            # there's no connection pool, so if these were distinct
            # calls, we'd be connecting each time
            conn = engine.pool._creator()
            cursor = conn.cursor()
            cursor.execute(sql, arg)
            cursor.lastrowid
            conn.commit()
            conn.close()
    else:
        for arg in args:
            conn = engine.raw_connection()
            cursor = conn.cursor()
            cursor.execute(sql, arg)
            cursor.lastrowid
            conn.commit()
            conn.close()


if __name__ == "__main__":
    Profiler.main()