当前位置: 首页 > 工具软件 > Asyncpg > 使用案例 >

python 协程 asyncio 连接数据库 连接池 转换为pandasDataFrame 通过aiomysql 和 asyncpg 写好的两种轮子

乜飞航
2023-12-01

(2020年3月16日11:26:30创建)
这两个功能花了好长时间写的,可以长期运行在服务端
支持mysql和postgresql两种数据库
数据库连接池功能
备注:mysql了的已经没问题的,postgresql刚写的不确定是否完美,有建议的可以评论

mysql

#!/usr/bin/python3.8
import aiomysql
import traceback
import pandas as pd


class 数据库连接池DNA:
    def __init__(self):
        self.coon = None
        self.pool = None

    async def initpool(self):
        try:
            # self.lg.logger.debug("will connect mysql~")
            self.pool = await aiomysql.create_pool(
                minsize=5,
                maxsize=10,
                # host= '127.0.0.1'
                # host='192.168.1.72',
                host='localhost',
                port=3306,
                user='用户名',
                password='密码',
                db='数据库',
                charset='utf8',
                autocommit=True)
            # ("创建数据库连接池成功")
            # self.lg.logger.info("创建数据库连接池成功")

        except Exception:

            # self.lg.logger.error("数据库连接池:连接数据库发生错误,查看是否密码账户名错误")
            # self.lg.logger.error('connect error.', exc_info=True)
            pass

    async def getCurosr(self):  # 在getCurosr方法中是从连接池中重新获取了一个可用的连接。
        conn = await self.pool.acquire()
        cur = await conn.cursor()
        return conn, cur

    async def query(self, query, param=None):
        """返回纯正的query数据"""
        conn, cur = await self.getCurosr()
        try:
            # sql语句中 可能将空变量 实体化成 None 字符串
            query = query.replace("None", "null")

            await cur.execute(query, param)
            result = await cur.fetchall()
            # # 9┏--------------------┓"":{"conect":"列表化字段"
            # col_result = cur.description  # 获取查询结果的字段描述
            # columns = []
            # for i in range(len(col_result)):
            #     columns.append(col_result[i][0])  # 获取字段名,以列表形式保存
            # # 9┗--------------------┛}"列表化字段"
            # # 9┏--------------------┓"":{"conect":""
            # df = pd.DataFrame(columns=columns)
            # for i in range(len(result)):
            #     df.loc[i] = list(result[i])  # 按行插入查询到的数据
            # # 9┗--------------------┛}""
            # # 这里原来有将数据表转换成pandas 中更适合统计的数据类型,可是由于不能将数据表中的内容 使用在 json.dumps中,所以放弃使用,改为原始类型的数据表,然后需要统计的时候再转换.
            return result
        except Exception:
            # self.lg.logger.error(traceback.format_exc())
            pass
        finally:
            if cur:
                await cur.close()
            # 释放掉conn,将连接放回到连接池中
            await self.pool.release(conn)

    async def querytopd(self, query, param=None):
        """返回转换成pandasDataFrame 后的数据"""
        conn, cur = await self.getCurosr()
        try:
            await cur.execute(query, param)
            result = await cur.fetchall()
            # 9┏--------------------┓"":{"conect":"列表化字段"
            col_result = cur.description  # 获取查询结果的字段描述
            columns = []
            for i in range(len(col_result)):
                columns.append(col_result[i][0])  # 获取字段名,以列表形式保存
            # 9┗--------------------┛}"列表化字段"
            # 9┏--------------------┓"":{"conect":""
            df = pd.DataFrame(columns=columns)
            for i in range(len(result)):
                df.loc[i] = list(result[i])  # 按行插入查询到的数据
            # 9┗--------------------┛}""
            # 这里原来有将数据表转换成pandas 中更适合统计的数据类型,可是由于不能将数据表中的内容 使用在 json.dumps中,所以放弃使用,改为原始类型的数据表,然后需要统计的时候再转换.
            return df
        except Exception as e:
            # self.lg.logger.error(traceback.format_exc())
            # self.lg.logger.error(f"数据库查询出现错误:详情{e}")
            # self.lg.logger.error(f"导致出错的sql语句:{query}")
            # ("数据库查询出现错误:详情", e)
            # ("导致出错的sql语句:", query)
            pass
        finally:
            if cur:
                await cur.close()
            # 释放掉conn,将连接放回到连接池中
            await self.pool.release(conn)

    async def insert(self, query, param=None):
        conn, cur = await self.getCurosr()
        try:
            # sql语句中 可能将空变量 实体化成 None 字符串
            query = query.replace("None", "null")
            # self.lg.logger.debug(f"sql语句记录:{query}")
            await cur.execute(query, param)
            return
        except Exception:
            # ("出现错误")
            # self.lg.logger.error("出现错误")
            # self.lg.logger.error(traceback.format_exc())
            pass
        finally:
            if cur:
                await cur.close()
            # 释放掉conn,将连接放回到连接池中
            await self.pool.release(conn)

if __name__ == "__main__":
    import asyncio
  
    连接池实体 = 数据库连接池DNA()

    # print(dir(连接池实体))
    loop = asyncio.get_event_loop()
    tasks = [连接池实体.initpool()]
    loop.run_until_complete(asyncio.wait(tasks))

    sql_语句 = 'SELECT * FROM 表名'
    tasks = [连接池实体.quety_to_pd(sql_语句)]
    loop.run_until_complete(asyncio.wait(tasks))


postgresql

#!/usr/bin/python3.8
import asyncpg
import traceback
import pandas as pd


class 数据库连接池DNA(object):
    def __init__(self):
        self.coon = None
        self.pool = None
        # self.lg = lg

    async def initpool(self):
        try:
            # self.lg.logger.debug("即将连接postgresql")
            print("即将连接postgresql")
            self.pool = await asyncpg.create_pool(
                min_size=5,
                max_size=10,
                user='用户名',
                host='192.168.1.109',
                port=5432,
                password="密码",
                database="数据库名")

            # self.lg.logger.info("数据库连接池成功")

        except Exception:
            pass
            # self.lg.logger.error("数据库连接池:连接数据库发生错误,查看是否密码账户名错误")
            # print("数据库连接池:连接数据库发生错误,查看是否密码账户名错误")
            # self.lg.logger.error('connect error.', exc_info=True)
            # print('connect error.', exc_info=True)

    async def quety_to_pd(self, query):
        conn = await self.pool.acquire()
        try:
            def 生成带列名的空pd(单条记录的keys):
                columns = []
                for i in (单条记录的keys):
                    columns.append(i)
                df_空 = pd.DataFrame(columns=columns)
                return df_空

            query = query.replace("None", "null")
            result = await conn.fetch(query)
            if result:
                df = 生成带列名的空pd(result[0].keys())
                for i in range(len(result)):
                    df.loc[i] = list(result[i])  # 按行插入查询到的数据
                print(df)
                return df
            else:
                print("该数据库没有任何记录,所以不生成pdDataFrame")
                return None

        except Exception:
            # self.lg.logger.error(traceback.format_exc())
            # self.lg.logger.error(f"数据库查询出现错误:详情{e}")
            # self.lg.logger.error(f"导致出错的sql语句:{query}")
            pass
        finally:
            # 释放掉conn,将连接放回到连接池中
            await self.pool.release(conn)


if __name__ == "__main__":
    import asyncio
  
    连接池实体 = 数据库连接池DNA()

    # print(dir(连接池实体))
    loop = asyncio.get_event_loop()
    tasks = [连接池实体.initpool()]
    loop.run_until_complete(asyncio.wait(tasks))

    sql_语句 = 'SELECT * FROM 表名'
    tasks = [连接池实体.quety_to_pd(sql_语句)]
    loop.run_until_complete(asyncio.wait(tasks))

 类似资料: