当前位置: 首页 > 工具软件 > Asyncpg > 使用案例 >

python多进程数据库连接池_Python进程异步连接数据库连接池通过aimysql和asyncpg转换为pandas dataframe编写的两个轮子,python,协程,asyncio,pand...

鲁洋
2023-12-01

#!/usr/bin/python3.8

import aiomysql

import traceback

import pandas as pd

class 数据库连接池DNA:

def __init__(self):

self.coon = None

self.pool = None

async def initpool(self):

try:

# self.lg.logger.debug("will connect mysql~")

self.pool = await aiomysql.create_pool(

minsize=5,

maxsize=10,

# host= '127.0.0.1'

# host='192.168.1.72',

host='localhost',

port=3306,

user='用户名',

password='密码',

db='数据库',

charset='utf8',

autocommit=True)

# ("创建数据库连接池成功")

# self.lg.logger.info("创建数据库连接池成功")

except Exception:

# self.lg.logger.error("数据库连接池:连接数据库发生错误,查看是否密码账户名错误")

# self.lg.logger.error('connect error.', exc_info=True)

pass

async def getCurosr(self): # 在getCurosr方法中是从连接池中重新获取了一个可用的连接。

conn = await self.pool.acquire()

cur = await conn.cursor()

return conn, cur

async def query(self, query, param=None):

"""返回纯正的query数据"""

conn, cur = await self.getCurosr()

try:

# sql语句中 可能将空变量 实体化成 None 字符串

query = query.replace("None", "null")

await cur.execute(query, param)

result = await cur.fetchall()

# # 9┏--------------------┓"":{"conect":"列表化字段"

# col_result = cur.description # 获取查询结果的字段描述

# columns = []

# for i in range(len(col_result)):

# columns.append(col_result[i][0]) # 获取字段名,以列表形式保存

# # 9┗--------------------┛}"列表化字段"

# # 9┏--------------------┓"":{"conect":""

# df = pd.DataFrame(columns=columns)

# for i in range(len(result)):

# df.loc[i] = list(result[i]) # 按行插入查询到的数据

# # 9┗--------------------┛}""

# # 这里原来有将数据表转换成pandas 中更适合统计的数据类型,可是由于不能将数据表中的内容 使用在 json.dumps中,所以放弃使用,改为原始类型的数据表,然后需要统计的时候再转换.

return result

except Exception:

# self.lg.logger.error(traceback.format_exc())

pass

finally:

if cur:

await cur.close()

# 释放掉conn,将连接放回到连接池中

await self.pool.release(conn)

async def querytopd(self, query, param=None):

"""返回转换成pandasDataFrame 后的数据"""

conn, cur = await self.getCurosr()

try:

await cur.execute(query, param)

result = await cur.fetchall()

# 9┏--------------------┓"":{"conect":"列表化字段"

col_result = cur.description # 获取查询结果的字段描述

columns = []

for i in range(len(col_result)):

columns.append(col_result[i][0]) # 获取字段名,以列表形式保存

# 9┗--------------------┛}"列表化字段"

# 9┏--------------------┓"":{"conect":""

df = pd.DataFrame(columns=columns)

for i in range(len(result)):

df.loc[i] = list(result[i]) # 按行插入查询到的数据

# 9┗--------------------┛}""

# 这里原来有将数据表转换成pandas 中更适合统计的数据类型,可是由于不能将数据表中的内容 使用在 json.dumps中,所以放弃使用,改为原始类型的数据表,然后需要统计的时候再转换.

return df

except Exception as e:

# self.lg.logger.error(traceback.format_exc())

# self.lg.logger.error(f"数据库查询出现错误:详情{e}")

# self.lg.logger.error(f"导致出错的sql语句:{query}")

# ("数据库查询出现错误:详情", e)

# ("导致出错的sql语句:", query)

pass

finally:

if cur:

await cur.close()

# 释放掉conn,将连接放回到连接池中

await self.pool.release(conn)

async def insert(self, query, param=None):

conn, cur = await self.getCurosr()

try:

# sql语句中 可能将空变量 实体化成 None 字符串

query = query.replace("None", "null")

# self.lg.logger.debug(f"sql语句记录:{query}")

await cur.execute(query, param)

return

except Exception:

# ("出现错误")

# self.lg.logger.error("出现错误")

# self.lg.logger.error(traceback.format_exc())

pass

finally:

if cur:

await cur.close()

# 释放掉conn,将连接放回到连接池中

await self.pool.release(conn)

if __name__ == "__main__":

import asyncio

连接池实体 = 数据库连接池DNA()

# print(dir(连接池实体))

loop = asyncio.get_event_loop()

tasks = [连接池实体.initpool()]

loop.run_until_complete(asyncio.wait(tasks))

sql_语句 = 'SELECT * FROM 表名'

tasks = [连接池实体.quety_to_pd(sql_语句)]

loop.run_until_complete(asyncio.wait(tasks))

 类似资料: