涉及库:pymysql、sshtunnel
涉及数据库:MySQL
有时候(常常是工作中),涉及的数据库都是放在远程服务器上的,这个时候我们不能直接进行访问,得需要一个跳板来进行连接访问。
配置如下:
数据库所在远程服务器IP:IP_C
数据库所在远程服务器端口:Port_C
跳板机(通过ssh通道)所在服务器IP:IP_B
跳板机(通过ssh通道)所在服务器I端口:Port_B(22默认)
跳板机(通过ssh通道)所在服务器账号:username_B
跳板机(通过ssh通道)所在服务器密码:password_B
连接数据库的账号:username_sql
连接数据库的密码:password_sql
代码如下:
# _*_ coding:utf-8 _*_
import pymysql
from sshtunnel import SSHTunnelForwarder
with SSHTunnelForwarder(
(IP_B, Port_B), # 跳板机(堡垒机)B配置
ssh_password=password_B,
ssh_username=username_B,
remote_bind_address=(IP_C, Port_C)) as server: # 数据库存放服务器C配置
# 打开数据库连接
db_connect = pymysql.connect(host='127.0.0.1', # 本机主机A的IP(必须是这个)
port=server.local_bind_port,
user=username_sql,
passwd=password_sql,
db=dbname) # 需要连接的数据库的名称
连接远程数据库上面的就完成了,下面介绍下简单的封装,可以方便直接输入数据库名称和查询语句进行查询
# _*_ coding:utf-8 _*_
import pymysql
from sshtunnel import SSHTunnelForwarder
class ExecuteSQL(object):
# 传入需要连接的数据库的名称dbname和待执行的sql语句sql
def __init__(self, dbname, sql):
self.dbname = dbname
self.sql = sql
def execute_sql(self):
results = ''
with SSHTunnelForwarder(
(IP_B, Port_B), # 跳板机(堡垒机)B配置
ssh_password=password_B,
ssh_username=username_B,
remote_bind_address=(IP_C, Port_C)) as server: # 数据库存放服务器C配置
# 打开数据库连接
db_connect = pymysql.connect(host='127.0.0.1', # 本机主机A的IP(必须是这个)
port=server.local_bind_port,
user=username_sql,
passwd=password_sql,
db=self.dbname) # 需要连接的数据库的名称
# 使用cursor()方法获取操作游标
cursor = db_connect.cursor()
try:
# 执行SQL语句
cursor.execute(self.sql)
# 获取所有记录列表
results = cursor.fetchall()
except Exception as data:
print('Error: 执行查询失败,%s' % data)
db_connect.close()
return results
if __name__ == '__main__':
name = dbname
sql = "select * from table "
connect = ExecuteSQL(name, sql)
print(connect.execute_sql())
运行之后就可以获得查询的结果了,如果需要调用,直接导入ExecuteSQL( )就可以使用了。
上面写的封装缺点是一种数据库操作就得写个类,无法实现一次封装就能操作不同的方法,如查询、修改、删除等,因此进行以下封装满足以上需求:
class DB:
def __init__(self, dbname):
self.dbname = dbname
self.server = SSHTunnelForwarder(
('IP_B', Port_B),
ssh_username='username_B',
ssh_password='password_B',
remote_bind_address=('IP_C', Port_C))
self.server.start()
self.conn = pymysql.connect(host='127.0.0.1',
port=self.server.local_bind_port,
user='username_sql',
password='password_sql',
db=self.dbname,
charset='utf8')
print('数据库连接成功!')
self.cur = self.conn.cursor()
print('游标设置成功!')
def __del__(self): # 析构函数,实例删除时触发
self.cur.close()
self.conn.close()
self.server.stop()
def query(self, sql):
print('开始执行查询语句{}'.format(sql))
self.cur.execute(sql)
print('sql{}执行成功!'.format(sql))
return self.cur.fetchall()
def exec(self, sql):
try:
self.cur.execute(sql)
self.conn.commit()
print('sql{}执行成功!'.format(sql))
except Exception as e:
print(str(e))
self.conn.rollback()
写的有些粗糙,欢迎指正。