当前位置: 首页 > 工具软件 > sshtunnel > 使用案例 >

python使用sshtunnel+pymysql连接远程数据库及使用的简单封装

柯冯浩
2023-12-01
涉及库:pymysql、sshtunnel
涉及数据库:MySQL

有时候(常常是工作中),涉及的数据库都是放在远程服务器上的,这个时候我们不能直接进行访问,得需要一个跳板来进行连接访问。

配置如下:
数据库所在远程服务器IP:IP_C
数据库所在远程服务器端口:Port_C
跳板机(通过ssh通道)所在服务器IP:IP_B
跳板机(通过ssh通道)所在服务器I端口:Port_B(22默认)
跳板机(通过ssh通道)所在服务器账号:username_B
跳板机(通过ssh通道)所在服务器密码:password_B
连接数据库的账号:username_sql
连接数据库的密码:password_sql

代码如下:

# _*_ coding:utf-8 _*_
import pymysql
from sshtunnel import SSHTunnelForwarder

with SSHTunnelForwarder(
        (IP_B, Port_B),  # 跳板机(堡垒机)B配置
        ssh_password=password_B,
        ssh_username=username_B,
        remote_bind_address=(IP_C, Port_C)) as server:  # 数据库存放服务器C配置

    # 打开数据库连接
    db_connect = pymysql.connect(host='127.0.0.1',   # 本机主机A的IP(必须是这个)
                                 port=server.local_bind_port, 
                                 user=username_sql,
                                 passwd=password_sql,
                                 db=dbname)  # 需要连接的数据库的名称
                                 

连接远程数据库上面的就完成了,下面介绍下简单的封装,可以方便直接输入数据库名称和查询语句进行查询

# _*_ coding:utf-8 _*_
import pymysql
from sshtunnel import SSHTunnelForwarder

class ExecuteSQL(object):
    
    # 传入需要连接的数据库的名称dbname和待执行的sql语句sql
    def __init__(self, dbname, sql):
        self.dbname = dbname
        self.sql = sql

    def execute_sql(self):
        results = ''
        with SSHTunnelForwarder(
                (IP_B, Port_B),  # 跳板机(堡垒机)B配置
                ssh_password=password_B,
                ssh_username=username_B,
                remote_bind_address=(IP_C, Port_C)) as server:  # 数据库存放服务器C配置

            # 打开数据库连接
            db_connect = pymysql.connect(host='127.0.0.1',  # 本机主机A的IP(必须是这个)
                                         port=server.local_bind_port,
                                         user=username_sql,
                                         passwd=password_sql,
                                         db=self.dbname)  # 需要连接的数据库的名称

            # 使用cursor()方法获取操作游标
            cursor = db_connect.cursor()

            try:
                # 执行SQL语句
                cursor.execute(self.sql)
                # 获取所有记录列表
                results = cursor.fetchall()
            except Exception as data:
                print('Error: 执行查询失败,%s' % data)

            db_connect.close()
            return results


if __name__ == '__main__':
    name = dbname
    sql = "select * from table "
    connect = ExecuteSQL(name, sql)
    print(connect.execute_sql())
    

运行之后就可以获得查询的结果了,如果需要调用,直接导入ExecuteSQL( )就可以使用了。

上面写的封装缺点是一种数据库操作就得写个类,无法实现一次封装就能操作不同的方法,如查询、修改、删除等,因此进行以下封装满足以上需求:

class DB:
    def __init__(self, dbname):
        self.dbname = dbname
        self.server = SSHTunnelForwarder(
                ('IP_B', Port_B),
                ssh_username='username_B',
                ssh_password='password_B',
                remote_bind_address=('IP_C', Port_C))
        self.server.start()
        self.conn = pymysql.connect(host='127.0.0.1',
                                    port=self.server.local_bind_port,
                                    user='username_sql',
                                    password='password_sql',
                                    db=self.dbname,
                                    charset='utf8')
        print('数据库连接成功!')
        self.cur = self.conn.cursor()
        print('游标设置成功!')
        
    def __del__(self):   # 析构函数,实例删除时触发
        self.cur.close()
        self.conn.close()
        self.server.stop()
        
    def query(self, sql):
        print('开始执行查询语句{}'.format(sql))
        self.cur.execute(sql)
        print('sql{}执行成功!'.format(sql))
        return self.cur.fetchall()

    def exec(self, sql):
        try:
            self.cur.execute(sql)
            self.conn.commit()
            print('sql{}执行成功!'.format(sql))
        except Exception as e:
            print(str(e))
            self.conn.rollback()

写的有些粗糙,欢迎指正。

 类似资料: