#!/usr/bin/python3.8
import aiomysql
import traceback
import pandas as pd
class 数据库连接池DNA:
def __init__(self):
self.coon = None
self.pool = None
async def initpool(self):
try:
# self.lg.logger.debug("will connect mysql~")
self.pool = await aiomysql.create_pool(
minsize=5,
maxsize=10,
# host= '127.0.0.1'
# host='192.168.1.72',
host='localhost',
port=3306,
user='用户名',
password='密码',
db='数据库',
charset='utf8',
autocommit=True)
# ("创建数据库连接池成功")
# self.lg.logger.info("创建数据库连接池成功")
except Exception:
# self.lg.logger.error("数据库连接池:连接数据库发生错误,查看是否密码账户名错误")
# self.lg.logger.error('connect error.', exc_info=True)
pass
async def getCurosr(self): # 在getCurosr方法中是从连接池中重新获取了一个可用的连接。
conn = await self.pool.acquire()
cur = await conn.cursor()
return conn, cur
async def query(self, query, param=None):
"""返回纯正的query数据"""
conn, cur = await self.getCurosr()
try:
# sql语句中 可能将空变量 实体化成 None 字符串
query = query.replace("None", "null")
await cur.execute(query, param)
result = await cur.fetchall()
# # 9┏--------------------┓"":{"conect":"列表化字段"
# col_result = cur.description # 获取查询结果的字段描述
# columns = []
# for i in range(len(col_result)):
# columns.append(col_result[i][0]) # 获取字段名,以列表形式保存
# # 9┗--------------------┛}"列表化字段"
# # 9┏--------------------┓"":{"conect":""
# df = pd.DataFrame(columns=columns)
# for i in range(len(result)):
# df.loc[i] = list(result[i]) # 按行插入查询到的数据
# # 9┗--------------------┛}""
# # 这里原来有将数据表转换成pandas 中更适合统计的数据类型,可是由于不能将数据表中的内容 使用在 json.dumps中,所以放弃使用,改为原始类型的数据表,然后需要统计的时候再转换.
return result
except Exception:
# self.lg.logger.error(traceback.format_exc())
pass
finally:
if cur:
await cur.close()
# 释放掉conn,将连接放回到连接池中
await self.pool.release(conn)
async def querytopd(self, query, param=None):
"""返回转换成pandasDataFrame 后的数据"""
conn, cur = await self.getCurosr()
try:
await cur.execute(query, param)
result = await cur.fetchall()
# 9┏--------------------┓"":{"conect":"列表化字段"
col_result = cur.description # 获取查询结果的字段描述
columns = []
for i in range(len(col_result)):
columns.append(col_result[i][0]) # 获取字段名,以列表形式保存
# 9┗--------------------┛}"列表化字段"
# 9┏--------------------┓"":{"conect":""
df = pd.DataFrame(columns=columns)
for i in range(len(result)):
df.loc[i] = list(result[i]) # 按行插入查询到的数据
# 9┗--------------------┛}""
# 这里原来有将数据表转换成pandas 中更适合统计的数据类型,可是由于不能将数据表中的内容 使用在 json.dumps中,所以放弃使用,改为原始类型的数据表,然后需要统计的时候再转换.
return df
except Exception as e:
# self.lg.logger.error(traceback.format_exc())
# self.lg.logger.error(f"数据库查询出现错误:详情{e}")
# self.lg.logger.error(f"导致出错的sql语句:{query}")
# ("数据库查询出现错误:详情", e)
# ("导致出错的sql语句:", query)
pass
finally:
if cur:
await cur.close()
# 释放掉conn,将连接放回到连接池中
await self.pool.release(conn)
async def insert(self, query, param=None):
conn, cur = await self.getCurosr()
try:
# sql语句中 可能将空变量 实体化成 None 字符串
query = query.replace("None", "null")
# self.lg.logger.debug(f"sql语句记录:{query}")
await cur.execute(query, param)
return
except Exception:
# ("出现错误")
# self.lg.logger.error("出现错误")
# self.lg.logger.error(traceback.format_exc())
pass
finally:
if cur:
await cur.close()
# 释放掉conn,将连接放回到连接池中
await self.pool.release(conn)
if __name__ == "__main__":
import asyncio
连接池实体 = 数据库连接池DNA()
# print(dir(连接池实体))
loop = asyncio.get_event_loop()
tasks = [连接池实体.initpool()]
loop.run_until_complete(asyncio.wait(tasks))
sql_语句 = 'SELECT * FROM 表名'
tasks = [连接池实体.quety_to_pd(sql_语句)]
loop.run_until_complete(asyncio.wait(tasks))