当前位置: 首页 > 工具软件 > sshtunnel > 使用案例 >

python通过sshtunnel连接跳板机访问mysql数据库

蒙弘图
2023-12-01

访问数据时,需要通过ssh跳转,python脚本中可以引入sshtunnel模块,先连接跳板机,再建立数据库连接,实现python访问数据库。直接上代码,方便你我他!需要的同学可以参考。

1.连接ssh后连接数据库

import sshtunnel
import pymysql
import time
sshtunnel.TUNNEL_TIMEOUT =60000
class SSH_to_Mysql(object):
    #初始化方法,ssh的公钥文件路径传参,ssh访问信息、数据库的访问信息以字典方式传参
    def __init__(self,file,ssh_ms,mysqlconf):
        self.mysqlconf = mysqlconf
        self.server =  sshtunnel.SSHTunnelForwarder(
            (ssh_ms["ssh_host"], ssh_ms['ssh_port']),
            ssh_username=ssh_ms["ssh_user"],
            ssh_password=ssh_ms["password"],
            ssh_pkey=file,
            remote_bind_address=(self.mysqlconf["host"], self.mysqlconf["port"])
        )
        self.server.start()
        local_port = self.server.local_bind_port
        self.server.set_keepalive
        self.conn = pymysql.connect(host='127.0.0.1',
                               port=local_port,
                               user=self.mysqlconf["user"],
                               password=self.mysqlconf["passwd"],
                               db=self.mysqlconf['db'],
                               charset='utf8',
                               cursorclass=pymysql.cursors.DictCursor
                               )

        self.cursor = self.conn.cursor()
    def __del__(self):
        self.cursor.close()
        self.conn.close()
        

    def excute_sql(self,sql):
        results = []
        counts = self.cursor.execute(sql)
        rows = self.cursor.fetchall()
        #根据数据需要,处理sql执行结果
        for result in rows:
            # print(result)
            s = []
            for k in result:
                s.append(result[k])
            str1 = str(s)
            str2 = str1.replace("[","",1)
            str2 = str2[:-1]+""

            results.append(str2)
        return results

2.定义主函数,初始化数据库连接并执行sql语句

2.1使用用户名、密码访问ssh,公钥文件不传

def main():
        #字典存储ssh配置信息
        ssh_ms = {"ssh_host": "xx.xx.xx.xxx", "ssh_port": xxxx, "ssh_user": "xxxx",
                           'password': 'xxxx'}
        sql="SELECT code FROM `student` where name like '%张三%'"
        #字典存储数据库配置信息
        mysql_conf = {'host': '127.0.0.1', 'user': '数据库用户名', 'passwd': '数据库密码', 'port': xxx, 'db': 'dbname'}
        #初始化自定义类,将ssh配置信息、mysql配置信息以参数方式传递
        dc_new = SSH_to_Mysql(file="", ssh_ms=ssh_ms, mysqlconf=mysql_conf)
        #执行sql语句
        dc_new.excute_sql(sql)


if __name__ == '__main__':
    main()

2.2使用公钥访问ssh,ssh密码为空

def main():
        #字典存储ssh配置信息,密码为空,公钥文件中定义
        ssh_ms = {"ssh_host": "ip", "ssh_port": 端口, "ssh_user": "用户名", 'password': ''}
        #公钥文件路径
        pk_file = r"C:\Users\xxx\Desktop\xxx.pem"
        sql = "SELECT student_name,score FROM `student_score_infos` where name like '%zhangsan%'"
        #字典存储数据库配置信息
        mysql_info = {'host': 'ip', 'user': '用户名', 'passwd': '密码', 'port': 端口, 'db': '表名'}
        dc_new = SSH_to_Mysql(file=pk_file, ssh_ms=ssh_ms, mysqlconf=mysql_info)
        dc_new.excute_sql(sql)

if __name__ == '__main__':
    main()

 类似资料: