当前位置: 首页 > 编程笔记 >

Python 实现数据库(SQL)更新脚本的生成方法

梁锋
2023-03-14
本文向大家介绍Python 实现数据库(SQL)更新脚本的生成方法,包括了Python 实现数据库(SQL)更新脚本的生成方法的使用技巧和注意事项,需要的朋友参考一下

我在工作的时候,在测试环境下使用的数据库跟生产环境的数据库不一致,当我们的测试环境下的数据库完成测试准备更新到生产环境上的数据库时候,需要准备更新脚本,真是一不小心没记下来就会忘了改了哪里,哪里添加了什么,这个真是非常让人头疼。因此我就试着用Python来实现自动的生成更新脚本,以免我这烂记性,记不住事。

  主要操作如下:

  1.在原先 basedao.py 中添加如下方法,这样旧能很方便的获取数据库的数据,为测试数据库和生产数据库做对比打下了基础。

def select_database_struts(self):
  '''
  查找当前连接配置中的数据库结构以字典集合
  '''
  sql = '''SELECT COLUMN_NAME, IS_NULLABLE, COLUMN_TYPE, COLUMN_KEY, COLUMN_COMMENT
    FROM information_schema.`COLUMNS` 
    WHERE TABLE_SCHEMA="%s" AND TABLE_NAME="{0}" '''%(self.__database)
  struts = {}
  for k in self.__primaryKey_dict.keys():
   self.__cursor.execute(sql.format(k))
   results = self.__cursor.fetchall()
   struts[k] = {}
   for result in results:
    struts[k][result[0]] = {}
    struts[k][result[0]]["COLUMN_NAME"] = result[0]
    struts[k][result[0]]["IS_NULLABLE"] = result[1]
    struts[k][result[0]]["COLUMN_TYPE"] = result[2]
    struts[k][result[0]]["COLUMN_KEY"] = result[3]
    struts[k][result[0]]["COLUMN_COMMENT"] = result[4]
  return self.__config, struts

  2.编写对比的Python脚本

'''
数据库迁移脚本, 目前支持一下几种功能:
1.生成旧数据库中没有的数据库表执行 SQL 脚本(支持是否带表数据),生成的 SQL 脚本在 temp 目录下(表名.sql)。
2.生成添加列 SQL 脚本,生成的 SQL 脚本统一放在 temp 目录下的 depoyed.sql 中。
3.生成修改列属性 SQL 脚本,生成的 SQL 脚本统一放在 temp 目录下的 depoyed.sql 中。
4.生成删除列 SQL 脚本,生成的 SQL 脚本统一放在 temp 目录下的 depoyed.sql 中。
'''
import json, os, sys
from basedao import BaseDao

temp_path = sys.path[0] + "/temp"
if not os.path.exists(temp_path):
 os.mkdir(temp_path)

def main(old, new, has_data=False):
 '''
 @old 旧数据库(目标数据库)
 @new 最新的数据库(源数据库)
 @has_data 是否生成结构+数据的sql脚本 
 '''
 clear_temp() # 先清理 temp 目录
 old_config, old_struts = old
 new_config, new_struts = new
 for new_table, new_fields in new_struts.items():
  if old_struts.get(new_table) is None:
   gc_sql(new_config["user"], new_config["password"], new_config["database"], new_table, has_data)
  else:
   cmp_table(old_struts[new_table], new_struts[new_table], new_table)

def cmp_table(old, new, table):
 '''
 对比表结构生成 sql
 '''
 old_fields = old
 new_fields = new

 sql_add_column = "ALTER TABLE `{TABLE}` ADD COLUMN `{COLUMN_NAME}` {COLUMN_TYPE} COMMENT '{COLUMN_COMMENT}';\n"
 sql_change_column = "ALTER TABLE `{TABLE}` CHANGE `{COLUMN_NAME}` `{COLUMN_NAME}` {COLUMN_TYPE} COMMENT '{COLUMN_COMMENT}';\n"
 sql_del_column = "ALTER TABLE `{TABLE}` DROP {COLUMN_NAME};"

 if old_fields != new_fields:
  f = open(sys.path[0] + "/temp/deploy.sql", "a", encoding="utf8")
  content = ""
  for new_field, new_field_dict in new_fields.items():
   old_filed_dict = old_fields.get(new_field)
   if old_filed_dict is None:
    # 生成添加列 sql
    content += sql_add_column.format(TABLE=table, **new_field_dict)
   else:
    # 生成修改列 sql
    if old_filed_dict != new_field_dict:
     content += sql_change_column.format(TABLE=table, **new_field_dict)
    pass
  # 生成删除列 sql
  for old_field, old_field_dict in old_fields.items():
   if new_fields.get(old_field) is None:
    content += sql_del_column.format(TABLE=table, COLUMN_NAME=old_field)
    
  f.write(content)
  f.close()

def gc_sql(user, pwd, db, table, has_data):
 '''
 生成 sql 文件
 '''
 if has_data:
  sys_order = "mysqldump -u%s -p%s %s %s > %s/%s.sql"%(user, pwd, db, table, temp_path, table)
 else:
  sys_order = "mysqldump -u%s -p%s -d %s %s > %s/%s.sql"%(user, pwd, db, table, temp_path, table)
 os.system(sys_order)

def clear_temp():
 '''
 每次执行的时候调用这个,先清理下temp目录下面的旧文件
 '''
 if os.path.exists(temp_path):
  files = os.listdir(temp_path)
  for file in files:
   f = os.path.join(temp_path, file)
   if os.path.isfile(f):
    os.remove(f)
 print("临时文件目录清理完成")

if __name__ == "__main__":
 test1_config = {
  "user" : "root", 
  "password" : "root",
  "database" : "test1", 
 }
 test2_config = {
  "user" : "root", 
  "password" : "root",
  "database" : "test2", 
 }
 
 test1_dao = BaseDao(**test1_config)
 test1_struts = test1_dao.select_database_struts()
 
 test2_dao = BaseDao(**test2_config)
 test2_struts = test2_dao.select_database_struts()

 main(test2_struts, test1_struts)

  目前只支持了4种SQL脚本的生成。

总结

以上所述是小编给大家介绍的Python 实现数据库(SQL)更新脚本的生成方法,希望对大家有所帮助,如果大家有任何疑问欢迎给我留言,小编会及时回复大家的!

 类似资料:
  • 本文向大家介绍Python 实现数据库更新脚本的生成方法,包括了Python 实现数据库更新脚本的生成方法的使用技巧和注意事项,需要的朋友参考一下 我在工作的时候,在测试环境下使用的数据库跟生产环境的数据库不一致,当我们的测试环境下的数据库完成测试准备更新到生产环境上的数据库时候,需要准备更新脚本,真是一不小心没记下来就会忘了改了哪里,哪里添加了什么,这个真是非常让人头疼。因此我就试着用Pytho

  • 问题内容: 如何使用SQL(选择/存储过程/等)从SQL Server数据库生成所有表的DDL(带有外键,索引等)脚本?我需要除数据外的所有内容。 我 无法 使用Sql Server Management Studio,因为我想在将在Linux上运行的node.js脚本中使用它! 问题答案: 对于表:(您可以按以下方式工作) 程序如下:

  • 本文向大家介绍Python实现生成随机数据插入mysql数据库的方法,包括了Python实现生成随机数据插入mysql数据库的方法的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了Python实现生成随机数据插入mysql数据库的方法。分享给大家供大家参考,具体如下: 运行结果: 实现代码: 可见数据库中插入的数据有随机用户名及其对应密码。 PS:这里再为大家推荐一款功能相似的在线工具供大家

  • 本文向大家介绍Python的mysql数据库的更新如何实现,包括了Python的mysql数据库的更新如何实现的使用技巧和注意事项,需要的朋友参考一下 Python的mysql数据库的更新           Python的mysql数据库的更新操作,在实际应用项目中会用到更新数据库,更新过程中可能会出现数据丢失或者数据错乱等系统性的问题,还希望大家正确操作, 一 代码 二 运行结果 以上就是Py

  • 问题内容: 首先,有一点背景。我有一套Java应用程序,有些基于JPA,有些则没有。为了创建数据库,我目前正在使用Hibernates模式导出来为使用JPA的用户生成创建脚本。那些不使用JPA的人会手工生成脚本。然后使用ANT在应用程序安装过程中运行它们。对于更新,应用程序安装程序只需将更新脚本应用于数据库。 为了改善数据库更新的管理,我一直在研究Flyway和Liquibase。两者似乎几乎都可

  • 本文向大家介绍python实现DEM数据的阴影生成的方法,包括了python实现DEM数据的阴影生成的方法的使用技巧和注意事项,需要的朋友参考一下 相关的依赖库在我的github网站上 首先贴出代码: 生成的结果如下:左边是DEM数据,右边是shadow 以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持呐喊教程。