# -*- coding: utf-8 -*-
# !/usr/bin/env python
# Software: PyCharm
# __author__ == "YU HAIPENG"
# fileName: asyncpostgres.py
# Month: 六月
# time: 2021/6/21 18:33
""" 官方文档 https://www.postgresql.org/docs/devel/functions-json.html """
from functools import wraps
from asyncpg import create_pool
from asyncio import Lock
create_pool_look = Lock()
def init_decorator():
def decorator(func):
@wraps(func)
async def wrapper(self, *args, **kwargs):
transaction = kwargs.pop("transaction", None) or self.transaction
await self.create_pool(**self.params)
async with self._pool.acquire(timeout=self.acquire_timeout) as con:
if transaction:
isolation = kwargs.pop("isolation", None)
readonly = kwargs.pop("readonly", False)
deferrable = kwargs.pop("deferrable", False)
async with con.transaction(isolation=isolation, readonly=readonly, deferrable=deferrable):
n = await func(self, __conn__=con, *args, **kwargs)
else:
n = await func(self, __conn__=con, *args, **kwargs)
return n
return wrapper
return decorator
class AsyncPostGre:
def __init__(
self,
dsn=None, *, min_size=10, max_size=10,
max_queries=50000, max_inactive_connection_lifetime=300.0, setup=None, init=None,
loop=None, host=None, port=None, user=None, password=None, command_timeout=360,
database=None, transaction=False, acquire_timeout=360, **connect_kwargs):
# dsn = "postgres://user:pass@host:port/database?option=value"
"""
@param dsn:
@param min_size:
@param max_size:
@param max_queries:
@param max_inactive_connection_lifetime:
@param setup:
@param init:
@param loop:
@param host:
@param port:
@param user:
@param password:
@param database:
@param connect_kwargs:
@param transaction: bool True 开启事务 查询中传入 transaction 会优先使用传入的 transaction
passfile
timeout=60
statement_cache_size=100
max_cached_statement_lifetime=300
max_cacheable_statement_size=1024 * 15
command_timeout=None
ssl=None
connection_class=Connection
record_class=protocol.Record
server_settings=None
"""
self.params = dict(
dsn=dsn,
min_size=min_size,
max_size=max_size,
max_queries=max_queries,
max_inactive_connection_lifetime=max_inactive_connection_lifetime,
setup=setup, init=init, loop=loop,
database=database, host=host, port=port,
user=user, password=password, command_timeout=command_timeout,
**connect_kwargs)
self._pool = None
self.transaction = transaction
self.acquire_timeout = acquire_timeout
async def create_pool(self, **kwargs):
if self._pool is None:
async with create_pool_look:
if self._pool is None:
self.params.update(**kwargs)
self._pool = await create_pool(**self.params)
return self._pool
@property
def get_pool(self):
return self._pool
async def close(self):
if self._pool:
await self._pool.close()
self._pool = None
@init_decorator()
async def query_all_execute(self, sql, *args, **kwargs):
"""
'''SELECT * FROM "t_product_ispl" WHERE "licenseno" LIKE $1 LIMIT 10 OFFSET 0;'''
@param sql:
@param args:a,b,c
@param kwargs:
@return:
"""
con = kwargs.pop("__conn__")
return await con.fetch(sql, *args, **kwargs)
@init_decorator()
async def query_one_execute(self, sql, *args, **kwargs):
"""
:param sql:
:param args: ['a', 'b', 'c']
:param kwargs:
:return:
"""
con = kwargs.pop("__conn__")
return await con.fetchrow(sql, *args, **kwargs)
@init_decorator()
async def commit_one_execute(self, sql, *args, **kwargs):
# execute(insert into girl(name, age, place) values ($1, $2, $3)", '十六夜咲夜', 17, '红魔馆')
con = kwargs.pop("__conn__")
return await con.execute(sql, *args, **kwargs)
@init_decorator()
async def commit_executemany(self, sql, *args, **kwargs):
# executemany("insert into girl(name, age, place) values ($1, $2, $3)",
# [('十六夜咲夜', 17, '红魔馆'), ('琪露诺', 60, '雾之湖')])
con = kwargs.pop("__conn__")
return await con.executemany(sql, args=args, **kwargs)
@init_decorator()
async def commit_return_id(self, sql, *args, **kwargs):
"""
此如果只是关注记录是否插入成功,那么推荐使用 executemany 加事务的方式,这样我们只需要传递一个包含元组的列表即可。如果还需要获取插入记录的自增 id,那么需要使用 fetch 加事务的方式(如果是多条 SQL 的话),但是需要事先将 SQL 语句拼接好才行,并且还要使用 PostgreSQL 提供的 returning 字句,否则是不会返回信息的(只能得到一个 None)。
至于 fetchrow 和 execute,它们只针对于单条,因此建议直接使用 fetch 和 executemany 即可。
因此想插入多条记录的话, 只能先拼接好 SQL 语句, 并且想返回对应的自增 id 的话, 需要在语句结尾加上 returning id
# 当然这个是数据库的语法, 否则得到的就是一个 []
insert into girl(name, age, place) values ('太田顺也', 43, 'japan'), ('zun', 43, 'japan') returning id
@param sql:
@param args:
@param kwargs:
@return:
"""
if "returning id" not in sql:
sql = f"{sql.rstrip(';')} returning id; "
con = kwargs.pop("__conn__")
return await con.fetch(sql, *args, **kwargs)
async def transaction_operation(self, async_func, *args, **kwargs):
"""
事物操作
@param async_func: 异步回调函数
example
async def callback(con, *args, **kwargs):
await con.executemany(*args, **kwargs)
await con.execute(*args, **kwargs)
await con.fetchrow(*args, **kwargs)
await other_function(*args, **kwargs)
return await con.fetch(*args, **kwargs)
@param args:
@param kwargs:
isolation:
事务级别:
[read_committed, # 读取已提交
serializable, # 可序列化
repeatable_read # 可重复读取
]
@return:
"""
isolation = kwargs.pop("isolation", None)
readonly = kwargs.pop("readonly", False)
deferrable = kwargs.pop("deferrable", False)
if not callable(async_func):
raise ValueError("async_func must be callable")
await self.create_pool(**self.params)
async with self._pool.acquire(timeout=self.acquire_timeout) as con:
async with con.transaction(isolation=isolation, readonly=readonly, deferrable=deferrable):
result = await async_func(con, *args, **kwargs)
return result
async def __aenter__(self):
self._pool = await self.create_pool(**self.params)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
@init_decorator()
async def create_index(
self, index_name, tb_name,
field_name, index_type='INDEX',
index_search_type=None,
**kwargs
):
"""
@param index_name:
@param tb_name:
@param field_name: list or str
@param index_type:
@param index_search_type: btree、hash、 gist、spgist、 gin以及brin。 默认方法是btree 自定义类型..
只有 B-tree,GiST,GIN和BRIN索引类型支持多列索引。最多可以指定32列。使用最左匹配原则。
@param kwargs:
to_sql 返回sql语句
@return:
"""
con = kwargs.pop("__conn__")
assert index_type.upper() in ["INDEX", 'UNIQUE']
if isinstance(field_name, str):
field_name = [field_name]
sql = f"CREATE {index_type.upper()} IF NOT EXISTS {index_name} ON {tb_name}"
if index_search_type:
sql += f" using {index_search_type}"
sql += f"({','.join(field_name)});"
if kwargs.pop("to_sql"):
return sql
return await con.fetch(sql)
@init_decorator()
async def drop_index(self, index_name, **kwargs):
con = kwargs.pop("__conn__")
sql = f"drop index {index_name};"
return await con.fetch(sql)
@property
async def get_server_pid(self):
await self.create_pool()
async with self._pool.acquire(timeout=self.acquire_timeout) as con:
return con.get_server_pid()
@property
async def get_server_version(self):
await self.create_pool()
async with self._pool.acquire(timeout=self.acquire_timeout) as con:
return con.get_server_version()
@property
async def get_settings(self):
await self.create_pool()
async with self._pool.acquire(timeout=self.acquire_timeout) as con:
return con.get_settings()
if __name__ == '__main__':
import asyncio
async def callback(con, *args, **kwargs):
return await con.fetch(*args, **kwargs)
async def test(query, ):
try:
# result2 = await post_gr.create_index("index_name", "tb_name", ["field_name",'a', 'b'], index_type='UNIQUE',to_sql=True,)
# print(result2)
# result = await post_gr.query_one_execute(query, )
# result = await post_gr.transaction_operation(callback, query, params)
result = await post_gr.get_settings
print(result)
finally:
await post_gr.close()
pass
post_gr = AsyncPostGre(password='123456',
host='192.168.1.100',
user='postgres',
database='datacheck-test')
qu = '''select '{"a":1, "b":2}'::jsonb @> '{"b":2}'::jsonb as tt; ''' # 包含
qu = '''select * from json_each('{"a":"foo", "b":"bar"}'); ''' # 包含
loop_event = asyncio.get_event_loop()
loop_event.run_until_complete(test(qu))