records库提供简便地处理 SQL 数据库的方法,简单好用。
https://blog.csdn.net/weixin_42725107/article/details/100766593
# 安装records库
pip install records
# 安装 MySQL 驱动程序:
pip install records[mysql]
pip install records mysqlclient
# 安装 PostgreSQL 驱动程序
pip install records[postgresql]
# 安装oracle支持
pip install cx_Oracle
Records 是基于 SQLAlchemy 实现的,所以数据库连接方式参考:https://docs.sqlalchemy.org/en/13/dialects/
参考:https://blog.csdn.net/weixin_42725107/article/details/100766593
# 1)连接sqllite
db = records.Database(‘sqlite:///users.db’)
db = records.Database(‘sqlite:absolute/path/to/file.db’)
# 2)Oracle 数据库连接示例:
## 先安装 cx_Oracle
oracle://root:1234@ORCL
#3)MySQL 数据库连接串示例:
mysql://root:12345@localhost/mydb?charset=utf8
# 4)PostgreSQL 数据库连接串示例:
postgresql://postgres:1234@localhost/mydb
# 5)SQL Server 数据库连接示例:
## 首先安装 pymssql
mssql+pymssql://sa:12345@localhost:1433/mydb
# -*- coding: utf-8 -*-
# @Time : 3/17/21 5:29 下午
# @Author : qiaofei.li
# @FileName: records_db.py
# @Software: PyCharm
# @Blog :https://blog.csdn.net/omaidb
"""
records操作mysql数据的模块
"""
import records
# 数据库查询类
class SqlQuery:
def __init__(self, user, password, host, port: int, db_name):
"""
初始化数据库连接
:param user: 传入数据库用户名
:param password: 传入数据库登录密码
:param host: 传入数据库主机地址
:param port: 传入端口号
:param db_name: 传入数据库名称
"""
self.db = records.Database(f'mysql+pymysql://{user}:{password}@{host}:{port}/{db_name}?charset=utf8',
pool_recycle=3600)
def db_query(self, sql语句):
"""
执行数据库语句功能
:param sql语句:
:return: 以列表方式返回查询结果
"""
# 连接数据库
rows = self.db.query(sql语句)
return rows.all()
def db_query_dict(self, sql语句):
"""
执行数据库查询功能,以字典方式返回结果
:param sql语句:
:return: 以字典返回查询结果
"""
# 连接数据库
rows = self.db.query(sql语句)
return rows.all(as_dict=True)
def db_transaction(self, sql语句1, sql语句2):
"""
事务类执行方法
:param sql语句2:
:param sql语句1:
:return:
"""
with self.db.transaction()as t:
t.query(sql语句1)
print("user1已执行插入...")
t.query(sql语句2)
print("user2已执行插入...") # 如果没有打印user2执行成功的信息,就是自动回滚事务了
def export_query_file(self, sql语句, 导出的路径和文件, 导出格式):
"""
导出查询结果为文件
:param 导出的路径和文件:
:param sql语句:
:param 导出格式: 传入文件格式,json或xlsx,yaml,html
:return:
"""
rows = self.db.query(sql语句)
with open(f'{导出的路径和文件}.{导出格式}', 'wb')as f:
f.write(rows.export(导出格式))
def delte_table(self, table_name):
"""
删除指定的数据表
:param table_name: 传入要删除的表名
:return:
"""
drop_table_sql = f"drop table if exists {table_name};"
self.db.query(drop_table_sql)
def create_table(self, table_name):
"""
创建指定的数据表
:param table_name: 传入要创建的表名
:return:
"""
create_table_sql = f"create table if not exists {table_name}(name varchar(20),age int) default charset = utf8;"
self.db.query(create_table_sql)
# if __name__ == '__main__':
# mytest = SqlQuery('root', '123456', '192.168.255.184', 3306, 'movie')
# sql语句 = 'SELECT * FROM `user`;'
# pprint(mytest.db_query_dict(sql语句))
import records
# 创建数据库连接
db = records.Database('sqlite:///example.db')
# 执行查询语句
rows = db.query('SELECT name, age FROM users WHERE age > :age', age=18)
# 遍历结果集
for row in rows:
print(row.name, row.age)