python DBUtils 线程池 连接 Postgresql(多线程公用线程池,DB-API : psycopg2)

康照
2023-12-01

一、DBUtils

DBUtils 是一套允许线程化 Python 程序可以安全和有效的访问数据库的模块,DBUtils提供两种外部接口: PersistentDB :提供线程专用的数据库连接,并自动管理连接。 PooledDB :提供线程间可共享的数据库连接,并自动管理连接。

操作数据库模板:

  1 import datetime
  2 import sys
  3 import os
  4 import configparser
  5 import logging
  6 import psycopg2
  7 
  8 from  DBUtils.PooledDB import PooledDB
  9 
 10 
 11   
 12 
 13 class  DatabaseOperator(object):
 14     '''
 15     class for database operator
 16     '''
 17 
 18 
 19     def __init__(self, 
 20         database_config_path, database_config=None):
 21         '''
 22         Constructor
 23         '''
 24         self._database_config_path = database_config_path
 25         
 26         # load database configuration
 27         if not database_config :
 28             self._database_config = self.parse_postgresql_config(database_config_path)
 29         else:
 30             self._database_config = database_config
 31         self._pool = None
 32 
 33     def database_config_empty(self):
 34         if self._database_config:
 35             return False
 36         else:
 37             return True
 38         
 39     def parse_postgresql_config(self, database_config_path=None):
 40         '''解析pei数据库配置文件
 41             参数
 42         ---------
 43         arg1 : conf_file
 44                         数据库配置文件路径
 45             返回值
 46         --------
 47         dict
 48                         解析配置属性dict--config
 49     
 50             示例
 51         --------
 52                 无
 53        '''
 54         if database_config_path == None and self._database_config_path != None:
 55             database_config_path = self._database_config_path
 56         if not os.path.isfile(database_config_path):
 57             sys.exit("ERROR: Could not find configuration file: {0}".format(database_config_path))
 58         parser =  configparser.SafeConfigParser()
 59         parser.read(database_config_path)
 60         config = {}
 61         config['database'] = parser.get('UniMonDB', 'Database')
 62         config['db_user'] = parser.get('UniMonDB', 'UserName')
 63         config['db_passwd'] = parser.get('UniMonDB', 'Password')
 64         config['db_port'] = parser.getint('UniMonDB', 'Port')
 65         config['db_host'] = parser.get('UniMonDB', 'Servername')
 66         self._database_config = config
 67         
 68         return config  
 69     
 70     
 71     def get_pool_conn(self):
 72         
 73         if not self._pool:
 74             self.init_pgsql_pool()
 75         return self._pool.connection()
 76         
 77     def init_pgsql_pool(self):
 78         '''利用数据库属性连接数据库
 79                     参数
 80                 ---------
 81                 arg1 : config
 82                                 数据库配置属性
 83                     返回值
 84                 --------
 85                 
 86                     示例
 87                 --------
 88                         无
 89         '''
 90         # 字典config是否为空
 91         config = self.parse_postgresql_config()
 92         POSTGREIP = config['db_host']
 93         POSTGREPORT = config['db_port']
 94         POSTGREDB = config['database']
 95         POSTGREUSER = config['db_user']
 96         POSTGREPASSWD = config['db_passwd']
 97         try:
 98             logging.info('Begin to create {0} postgresql pool on:{1}.\n'.format(POSTGREIP, datetime.datetime.now()))
 99             
100             pool = PooledDB(
101                 creator=psycopg2,  # 使用链接数据库的模块mincached
102                 maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
103                 mincached=1,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
104                 maxcached=4,  # 链接池中最多闲置的链接,0和None不限制
105                 blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
106                 maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
107                 setsession=[],  # 开始会话前执行的命令列表。
108                 host=POSTGREIP,
109                 port=POSTGREPORT,
110                 user=POSTGREUSER,
111                 password=POSTGREPASSWD,
112                 database=POSTGREDB)
113             self._pool = pool    
114             logging.info('SUCCESS: create postgresql success.\n')
115                     
116         except Exception as e:
117             logging.error('ERROR: create postgresql pool failed:{0}\n')
118             self.close_db_cursor()
119             sys.exit('ERROR: create postgresql pool error caused by {0}'.format(str(e)))
120 
121             
122     def pg_select_operator(self, sql):
123         '''进行查询操作,函数返回前关闭cursor,conn
124                     参数
125                 ---------
126                 arg1 : sql查询语句
127                     返回值
128                 --------
129                 list:result
130                                         类型为list的查询结果:result
131             
132                     示例
133                 --------
134                         无
135         '''
136         # 执行查询
137         try:
138             conn = self.get_pool_conn()
139             cursor = conn.cursor()      
140             cursor.execute(sql)
141             result = cursor.fetchall()
142         except Exception as e:
143             logging.error('ERROR: execute  {0} causes error'.format(sql))
144             sys.exit('ERROR: load data from database error caused {0}'.format(str(e)))
145         finally:
146             cursor.close()
147             conn.close()       
148         return result
149 
150     def test_pool_con(self):
151         sql = 'select * from tbl_devprofile'
152         result = self.pg_select_operator(sql)
153         print(result)
154         
155     def pg_insert_operator(self, sql):
156         
157         result = False
158         try:
159             conn = self.get_pool_conn()
160             cursor = conn.cursor()      
161             cursor.execute(sql)
162             result =  True
163         except Exception as e:
164             logging.error('ERROR: execute  {0} causes error'.format(sql))
165             sys.exit('ERROR: insert data from database error caused {0}'.format(str(e)))
166         finally:
167             cursor.close()
168             conn.commit()
169             conn.close()    
170         return result
171     
172     def pg_update_operator(self, sql):
173         
174         result = False
175         try:
176             conn = self.get_pool_conn()
177             cursor = conn.cursor()      
178             cursor.execute(sql)
179             result =  True
180         except Exception as e:
181             logging.error('ERROR: execute  {0} causes error'.format(sql))
182             sys.exit('ERROR: update data from database error caused {0}'.format(str(e)))
183         finally:
184             cursor.close()
185             conn.commit()
186             conn.close()    
187         return result
188 
189     def pg_delete_operator(self, sql):
190         result = False
191         # 执行查询
192         try:
193             conn = self.get_pool_conn()
194             cursor = conn.cursor()   
195             cursor.execute(sql)
196             result =  True
197         except Exception as e:
198             logging.error('ERROR: execute  {0} causes error'.format(sql))
199             sys.exit('ERROR: delete data from database error caused {0}'.format(str(e)))
200         finally:
201             cursor.close()
202             conn.commit()
203             conn.close()       
204         return result
205 
206     
207     def close_pool(self):
208         '''关闭pool
209                     参数
210                 ---------
211                         无 
212 
213                     返回值
214                 --------
215                         无
216                     示例
217                 --------
218                         无
219         '''
220         if self._pool != None:
221             self._pool.close()
222             
223 if __name__ == '__main__':
224     path = "E:\\Users\\Administrator\\eclipse-workspace\\com.leagsoft.basemodule\\base\\config\\sql_conf.conf"
225     db = DatabaseOperator(
226     database_config_path=path)
227     db.test_pool_con()

二、多线程

原理:创建多个线程类,多个线程类共享一个队里Queue,每一个线程类可以操作数据库

 1 from threading import Thread
 2     
 3 class Worker(Thread):
 4     def __init__(self, queue):
 5         Thread.__init__(self)
 6         self.queue = queue
 7  
 8     def run(self):
 9         while True:
10             # Get the work from the queue and expand the tuple
11             # 从队列中获取任务
12             database_operator, device, stand_alone_result = self.queue.get()
13             operateResult(database_operator, device, stand_alone_result)
14             # 任务执行完之后要通知队列
15             self.queue.task_done()

填充队列

 1     # 使用队列多线程
 2     logging.info('begin to update all device risk score by multi_processing.\n')
 3     from queue import Queue
 4     queue = Queue()
 5     # 六个线程,每个线程共享一个队列
 6     for _ in range(6):
 7         worker = Worker(queue)
 8         worker.setDaemon(True)
 9         worker.start()
10           
11     for record in all_devid:
12         device = record[0]
13         devtype = record[1]
14         all_countlist = all_dict.get(device)
15         stand_alone_result = device_assess(all_countlist)
16         if (devtype in (server_devtype + computer_devtype)) and (stand_alone_result < 100):
17             stand_alone_result *= 0.8
18         # 将设备风险评分数据保存到数据库中
19         queue.put((database_operator, device, stand_alone_result))
20      
21     #等待队列任务执行完
22     queue.join()
23 
24 
25 def operateResult(database_operator, device, stand_alone_result):
26     '''
27     函数名称: device_assess
28     描述:  保存单台设备分数到数据库
29     调用: 无
30     被调用:  main
31     被访问的表: tbl_devprofile
32     被修改的表: 无
33     输入参数: database_operator, device:设备uid, stand_alone_result:单台设备风险分数
34     输出参数:无
35     返回值: 单台设备风险分数值
36     其它:  无
37     '''
38     import time
39     find_profile_sql = "SELECT uiddevrecordid FROM tbl_devprofile WHERE uiddevrecordid='{0}';".format(device)
40     isExistRecord = database_operator.pg_select_operator(find_profile_sql)
41     #currentTime=datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
42     currentTime=time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))
43     if len(isExistRecord) > 0:
44         updata_profile_sql = "UPDATE tbl_devprofile SET irisklevel={0}, dtrisktime='{1}' \
45                               WHERE uiddevrecordid='{2}';".format(stand_alone_result, currentTime, device)
46         database_operator.pg_update_operator(updata_profile_sql)
47     else:
48         insert_profile_sql = "INSERT INTO tbl_devprofile VALUES('{0}',NULL,NULL,NULL,NULL,NULL,NULL,NULL,{1},'{2}');".format(
49             device, stand_alone_result, currentTime)
50         database_operator.pg_insert_operator(insert_profile_sql)

使用单线程时,执行完代码花费20s左右,使用多线程时花费5s左右。

 

 

Reference:

[1] https://blog.csdn.net/zhaihaifei/article/details/54016939

[2] https://www.cnblogs.com/hao-ming/p/7215050.html?utm_source=itdadao&utm_medium=referral

[3] https://www.cnblogs.com/wozijisun/p/6160065.html (多线程)

[4] http://www.lpfrx.com/archives/4431/

[5] https://www.cnblogs.com/95lyj/p/9047554.html

转载于:https://www.cnblogs.com/hoojjack/p/10292140.html

 类似资料: