#coding=utf-8
"""使用DBUtils数据库链接池中的链接,操做数据库
OperationalError: (2006, ‘MySQL server has gone away’)"""
importjsonimportpymysqlimportdatetimefrom DBUtils.PooledDB importPooledDBimportpymysqlclassMysqlClient(object):__pool =None;def __init__(self, mincached=10, maxcached=20, maxshared=10, maxconnections=200, blocking=True,
maxusage=100, setsession=None, reset=True,
host='127.0.0.1', port=3306, db='test',
user='root', passwd='123456', charset='utf8mb4'):""":param mincached:链接池中空闲链接的初始数量
:param maxcached:链接池中空闲链接的最大数量
:param maxshared:共享链接的最大数量
:param maxconnections:建立链接池的最大数量
:param blocking:超过最大链接数量时候的表现,为True等待链接数量降低,为false直接报错处理
:param maxusage:单个链接的最大重复使用次数
:param setsession:optional list of SQL commands that may serve to prepare
the session, e.g. ["set datestyle to ...", "set time zone ..."]
:param reset:how connections should be reset when returned to the pool
(False or None to rollback transcations started with begin(),
True to always issue a rollback for safety's sake)
:param host:数据库ip地址
:param port:数据库端口
:param db:库名
:param user:用户名
:param passwd:密码
:param charset:字符编码"""
if not self.__pool:
self.__class__.__pool =PooledDB(pymysql,
mincached, maxcached,
maxshared, maxconnections, blocking,
maxusage, setsession, reset,
host=host, port=port, db=db,
user=user, passwd=passwd,
charset=charset,
cursorclass=pymysql.cursors.DictCursor
)
self._conn=None
self._cursor=None
self.__get_conn()def __get_conn(self):
self._conn= self.__pool.connection();
self._cursor=self._conn.cursor();def close(self):try:
self._cursor.close()
self._conn.close()exceptException as e:printedef __execute(self, sql, param=()):
count=self._cursor.execute(sql, param)printcountreturncount
@staticmethoddef __dict_datetime_obj_to_str(result_dict):"""把字典里面的datatime对象转成字符串,使json转换不出错"""
ifresult_dict:
result_replace= {k: v.__str__() for k, v in result_dict.items() ifisinstance(v, datetime.datetime)}
result_dict.update(result_replace)returnresult_dictdef select_one(self, sql, param=()):"""查询单个结果"""count= self.__execute(sql, param)
result=self._cursor.fetchone()""":type result:dict"""result= self.__dict_datetime_obj_to_str(result)returncount, resultdef select_many(self, sql, param=()):"""查询多个结果
:param sql: qsl语句
:param param: sql参数
:return: 结果数量和查询结果集"""count= self.__execute(sql, param)
result=self._cursor.fetchall()""":type result:list"""[self.__dict_datetime_obj_to_str(row_dict) for row_dict inresult]returncount, resultdefexecute(self, sql, param=()):
count= self.__execute(sql, param)returncountdefbegin(self):"""开启事务"""self._conn.autocommit(0)def end(self, option='commit'):"""结束事务"""
if option == 'commit':
self._conn.autocommit()else:
self._conn.rollback()if __name__ == "__main__":
mc=MysqlClient()
sql1= 'SELECT * FROM shiji WHERE id = 1'result1=mc.select_one(sql1)print json.dumps(result1[1], ensure_ascii=False)
sql2= 'SELECT * FROM shiji WHERE id IN (%s,%s,%s)'param= (2, 3, 4)print json.dumps(mc.select_many(sql2, param)[1], ensure_ascii=False)