当前位置: 首页 > 工具软件 > Asyncpg > 使用案例 >

异步asyncpg

陶健
2023-12-01
# -*- coding: utf-8 -*-
# !/usr/bin/env python
# Software: PyCharm
# __author__ == "YU HAIPENG"
# fileName: asyncpostgres.py
# Month: 六月
# time: 2021/6/21 18:33
""" 官方文档 https://www.postgresql.org/docs/devel/functions-json.html """
from functools import wraps
from asyncpg import create_pool
from asyncio import Lock

create_pool_look = Lock()


def init_decorator():
    def decorator(func):
        @wraps(func)
        async def wrapper(self, *args, **kwargs):
            transaction = kwargs.pop("transaction", None) or self.transaction
            await self.create_pool(**self.params)
            async with self._pool.acquire(timeout=self.acquire_timeout) as con:
                if transaction:
                    isolation = kwargs.pop("isolation", None)
                    readonly = kwargs.pop("readonly", False)
                    deferrable = kwargs.pop("deferrable", False)
                    async with con.transaction(isolation=isolation, readonly=readonly, deferrable=deferrable):
                        n = await func(self, __conn__=con, *args, **kwargs)
                else:
                    n = await func(self, __conn__=con, *args, **kwargs)
            return n

        return wrapper

    return decorator


class AsyncPostGre:

    def __init__(
            self,
            dsn=None, *, min_size=10, max_size=10,
            max_queries=50000, max_inactive_connection_lifetime=300.0, setup=None, init=None,
            loop=None, host=None, port=None, user=None, password=None, command_timeout=360,
            database=None, transaction=False, acquire_timeout=360, **connect_kwargs):
        # dsn = "postgres://user:pass@host:port/database?option=value"
        """
        @param dsn:
        @param min_size:
        @param max_size:
        @param max_queries:
        @param max_inactive_connection_lifetime:
        @param setup:
        @param init:
        @param loop:
        @param host:
        @param port:
        @param user:
        @param password:
        @param database:
        @param connect_kwargs:
        @param transaction: bool True  开启事务 查询中传入 transaction 会优先使用传入的 transaction
        passfile
        timeout=60
        statement_cache_size=100
        max_cached_statement_lifetime=300
        max_cacheable_statement_size=1024 * 15
        command_timeout=None
        ssl=None
        connection_class=Connection
        record_class=protocol.Record
        server_settings=None
        """
        self.params = dict(
            dsn=dsn,
            min_size=min_size,
            max_size=max_size,
            max_queries=max_queries,
            max_inactive_connection_lifetime=max_inactive_connection_lifetime,
            setup=setup, init=init, loop=loop,
            database=database, host=host, port=port,
            user=user, password=password, command_timeout=command_timeout,
            **connect_kwargs)
        self._pool = None
        self.transaction = transaction
        self.acquire_timeout = acquire_timeout

    async def create_pool(self, **kwargs):
        if self._pool is None:
            async with create_pool_look:
                if self._pool is None:
                    self.params.update(**kwargs)
                    self._pool = await create_pool(**self.params)
        return self._pool

    @property
    def get_pool(self):
        return self._pool

    async def close(self):
        if self._pool:
            await self._pool.close()
            self._pool = None

    @init_decorator()
    async def query_all_execute(self, sql, *args, **kwargs):
        """
        '''SELECT * FROM "t_product_ispl" WHERE "licenseno" LIKE $1 LIMIT 10 OFFSET 0;'''

        @param sql:
        @param args:a,b,c
        @param kwargs:
        @return:
        """
        con = kwargs.pop("__conn__")
        return await con.fetch(sql, *args, **kwargs)

    @init_decorator()
    async def query_one_execute(self, sql, *args, **kwargs):
        """

        :param sql:
        :param args: ['a', 'b', 'c']
        :param kwargs:
        :return:
        """
        con = kwargs.pop("__conn__")
        return await con.fetchrow(sql, *args, **kwargs)

    @init_decorator()
    async def commit_one_execute(self, sql, *args, **kwargs):
        # execute(insert into girl(name, age, place) values ($1, $2, $3)", '十六夜咲夜', 17, '红魔馆')
        con = kwargs.pop("__conn__")
        return await con.execute(sql, *args, **kwargs)

    @init_decorator()
    async def commit_executemany(self, sql, *args, **kwargs):
        # executemany("insert into girl(name, age, place) values ($1, $2, $3)",
        # [('十六夜咲夜', 17, '红魔馆'), ('琪露诺', 60, '雾之湖')])
        con = kwargs.pop("__conn__")
        return await con.executemany(sql, args=args, **kwargs)

    @init_decorator()
    async def commit_return_id(self, sql, *args, **kwargs):
        """
        此如果只是关注记录是否插入成功,那么推荐使用 executemany 加事务的方式,这样我们只需要传递一个包含元组的列表即可。如果还需要获取插入记录的自增 id,那么需要使用 fetch 加事务的方式(如果是多条 SQL 的话),但是需要事先将 SQL 语句拼接好才行,并且还要使用 PostgreSQL 提供的 returning 字句,否则是不会返回信息的(只能得到一个 None)。
        至于 fetchrow 和 execute,它们只针对于单条,因此建议直接使用 fetch 和 executemany 即可。
        因此想插入多条记录的话, 只能先拼接好 SQL 语句, 并且想返回对应的自增 id 的话, 需要在语句结尾加上 returning id
        # 当然这个是数据库的语法, 否则得到的就是一个 []
        insert into girl(name, age, place) values ('太田顺也', 43, 'japan'), ('zun', 43, 'japan') returning id
        @param sql:
        @param args:
        @param kwargs:
        @return:
        """
        if "returning id" not in sql:
            sql = f"{sql.rstrip(';')} returning id; "
        con = kwargs.pop("__conn__")
        return await con.fetch(sql, *args, **kwargs)

    async def transaction_operation(self, async_func, *args, **kwargs):
        """
        事物操作
        @param async_func: 异步回调函数
            example
                    async def callback(con, *args, **kwargs):
                        await con.executemany(*args, **kwargs)
                        await con.execute(*args, **kwargs)
                        await con.fetchrow(*args, **kwargs)
                        await other_function(*args, **kwargs)
                        return await con.fetch(*args, **kwargs)
        @param args:
        @param kwargs:
        isolation:
            事务级别:
                [read_committed,  # 读取已提交
                serializable,  # 可序列化
                repeatable_read  # 可重复读取
                ]
        @return:
        """
        isolation = kwargs.pop("isolation", None)
        readonly = kwargs.pop("readonly", False)
        deferrable = kwargs.pop("deferrable", False)
        if not callable(async_func):
            raise ValueError("async_func must be callable")
        await self.create_pool(**self.params)
        async with self._pool.acquire(timeout=self.acquire_timeout) as con:
            async with con.transaction(isolation=isolation, readonly=readonly, deferrable=deferrable):
                result = await async_func(con, *args, **kwargs)
                return result

    async def __aenter__(self):
        self._pool = await self.create_pool(**self.params)
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.close()

    @init_decorator()
    async def create_index(
            self, index_name, tb_name,
            field_name, index_type='INDEX',
            index_search_type=None,
            **kwargs
    ):
        """

        @param index_name:
        @param tb_name:
        @param field_name: list or str
        @param index_type:
        @param index_search_type: btree、hash、 gist、spgist、 gin以及brin。 默认方法是btree 自定义类型..
            只有 B-tree,GiST,GIN和BRIN索引类型支持多列索引。最多可以指定32列。使用最左匹配原则。
        @param kwargs:
        to_sql  返回sql语句
        @return:
        """

        con = kwargs.pop("__conn__")
        assert index_type.upper() in ["INDEX", 'UNIQUE']
        if isinstance(field_name, str):
            field_name = [field_name]
        sql = f"CREATE {index_type.upper()} IF NOT EXISTS {index_name} ON {tb_name}"
        if index_search_type:
            sql += f" using {index_search_type}"
        sql += f"({','.join(field_name)});"
        if kwargs.pop("to_sql"):
            return sql
        return await con.fetch(sql)

    @init_decorator()
    async def drop_index(self, index_name, **kwargs):
        con = kwargs.pop("__conn__")

        sql = f"drop index {index_name};"
        return await con.fetch(sql)

    @property
    async def get_server_pid(self):
        await self.create_pool()
        async with self._pool.acquire(timeout=self.acquire_timeout) as con:
            return con.get_server_pid()

    @property
    async def get_server_version(self):
        await self.create_pool()
        async with self._pool.acquire(timeout=self.acquire_timeout) as con:
            return con.get_server_version()

    @property
    async def get_settings(self):
        await self.create_pool()
        async with self._pool.acquire(timeout=self.acquire_timeout) as con:
            return con.get_settings()


if __name__ == '__main__':
    import asyncio


    async def callback(con, *args, **kwargs):
        return await con.fetch(*args, **kwargs)


    async def test(query, ):
        try:
            # result2 = await post_gr.create_index("index_name", "tb_name", ["field_name",'a', 'b'], index_type='UNIQUE',to_sql=True,)
            # print(result2)
            # result = await post_gr.query_one_execute(query, )
            # result = await post_gr.transaction_operation(callback, query, params)
            result = await post_gr.get_settings
            print(result)
        finally:
            await post_gr.close()
            pass


    post_gr = AsyncPostGre(password='123456',
                  host='192.168.1.100',
                  user='postgres',
                  database='datacheck-test')
    qu = '''select '{"a":1, "b":2}'::jsonb @> '{"b":2}'::jsonb as tt; '''  # 包含
    qu = '''select * from json_each('{"a":"foo", "b":"bar"}'); '''  # 包含
    loop_event = asyncio.get_event_loop()
    loop_event.run_until_complete(test(qu))

 类似资料: