针对gh-ost封装了一个使用脚本,支持gh-ost的三种使用场景
#!/bin/env python
# -*- encoding: utf-8 -*-
# ----------------------------------------------
# Purpose: gh-ost
# Created: 2018-06-16
# Modified: hao.chen@woqutech.com
# Modified date: 2022-01-06
# ----------------------------------------------
import MySQLdb
import re
import sys
import time
import subprocess
import os
from optparse import OptionParser
def calc_time(func):
def _deco(*args, **kwargs):
begin_time = time.time()
func(*args, **kwargs)
cost_time = time.time() - begin_time
print
'cost time: %ss' % round(cost_time, 2)
return _deco
def get_table_count(conn, dbname, tbname):
query = ''' SELECT count(*) FROM %s.%s ''' % (dbname, tbname)
cursor = conn.cursor()
cursor.execute(query)
row_nums = cursor.fetchone()
cursor.close()
conn.close()
return row_nums
def online_ddl(conn, ddl_cmd):
cursor = conn.cursor()
cursor.execute(ddl_cmd)
conn.commit()
cursor.close()
conn.close()
# @calc_time
def run_cmd(cmd):
p = subprocess.Popen(cmd, shell=True)
return p, p.pid
def drop_ghost_table(conn, ghost_name_list):
try:
cursor = conn.cursor()
query = ''' DROP TABLE IF EXISTS %s; ''' % (ghost_name_list)
cursor.execute(query)
conn.commit()
cursor.close()
conn.close()
except Exception, e:
print
e
def getGHCMD(mode):
if mode == 1:
print "...."
cmd = '''gh-ost \
--user="%s" \
--password="%s" \
--host=%s \
--port=%s --database="%s" --table="%s" \
--allow-on-master \
--max-load='Threads_running=%d' \
--chunk-size=%d \
--serve-socket-file=%s \
--panic-flag-file=%s \
--throttle-additional-flag-file=%s \
--alter="%s" \
--heartbeat-interval-millis=2000 \
--initially-drop-ghost-table \
--initially-drop-old-table \
--cut-over=default \
--cut-over-lock-timeout-seconds=1 \
--assume-rbr \
--concurrent-rowcount \
--default-retries=10 \
--postpone-cut-over-flag-file=/tmp/ghost.postpone.flag \
--execute | tee rebuild_t1.log''' % (
options.mysqluser, options.mysqlpassword, options.mysqlhost, options.port, options.dbname,
options.tablename, Threads_running, chunk_size, gh_ost_socket, panic_flag,
throttle_flag, DDL_CMD)
return cmd
elif mode == 2:
cmd = '''gh-ost \
--user="%s" \
--password="%s" \
--host=%s \
--port=%s --database="%s" --table="%s" \
--max-load='Threads_running=%d' \
--chunk-size=%d \
--serve-socket-file=%s \
--panic-flag-file=%s \
--throttle-additional-flag-file=%s \
--alter="%s" \
--heartbeat-interval-millis=2000 \
--initially-drop-ghost-table \
--initially-drop-old-table \
--cut-over=default \
--cut-over-lock-timeout-seconds=1 \
--assume-rbr \
--concurrent-rowcount \
--default-retries=10 \
--postpone-cut-over-flag-file=/tmp/ghost.postpone.flag \
--execute | tee rebuild_t1.log''' % (
options.mysqluser, options.mysqlpassword, options.mysqlhost, options.port, options.dbname,
options.tablename, Threads_running, chunk_size, gh_ost_socket, panic_flag,
throttle_flag, DDL_CMD)
return cmd
elif mode == 3:
cmd = '''gh-ost \
--user="%s" \
--password="%s" \
--host=%s \
--port=%s --database="%s" --table="%s" \
--max-load='Threads_running=%d' \
--chunk-size=%d \
--serve-socket-file=%s \
--panic-flag-file=%s \
--throttle-additional-flag-file=%s \
--alter="%s" \
--heartbeat-interval-millis=2000 \
--initially-drop-ghost-table \
--initially-drop-old-table \
--cut-over=default \
--cut-over-lock-timeout-seconds=1 \
--assume-rbr \
--concurrent-rowcount \
--default-retries=10 \
--test-on-replica \
--postpone-cut-over-flag-file=/tmp/ghost.postpone.flag \
--execute | tee rebuild_t1.log''' % (
options.mysqluser, options.mysqlpassword, options.mysqlhost, options.port, options.dbname,
options.tablename, Threads_running, chunk_size, gh_ost_socket, panic_flag,
throttle_flag, DDL_CMD)
return cmd
if __name__ == "__main__":
parser = OptionParser()
parser.add_option("-P", "--Port", help="Port for search", dest="port")
parser.add_option("-D", "--Dbname", help="the Dbname to use", dest="dbname")
parser.add_option("-T", "--Table", help="the Table to use", dest="tablename")
parser.add_option("-u", "--user", help="mysql user", dest="mysqluser")
parser.add_option("-p", "--password", help="mysql user password", dest="mysqlpassword")
parser.add_option("-H", "--host", help="mysql host", dest="mysqlhost")
(options, args) = parser.parse_args()
if not options.port:
print
'params port need to apply'
exit()
if not options.dbname:
print
'params dbname need to apply'
exit()
if not options.tablename:
print
'params tablename need to apply'
exit()
gh_ost_socket = '/tmp/gh-ost.%s.%s.sock' % (options.dbname, options.tablename)
# 终止标志
panic_flag = '/tmp/gh-ost.panic.%s.%s.flag' % (options.dbname, options.tablename)
# postpone_flag = '/tmp/gh-ost.postpone.%s.%s.flag' %(options.dbname,options.tablename)
# 暂停标志
throttle_flag = '/tmp/gh-ost.throttle.%s.%s' % (options.dbname, options.tablename)
# socket = '/data/%s/tmp/mysql.sock' %(options.port)
socket = '/var/run/mysqld/mysqld.sock'
get_conn = MySQLdb.connect(host=options.mysqlhost, port=int(options.port), user=options.mysqluser,
passwd=options.mysqlpassword,
db=options.dbname, unix_socket=socket, charset='utf8')
conn = MySQLdb.connect(host=options.mysqlhost, port=int(options.port), user=options.mysqluser,
passwd=options.mysqlpassword,
db=options.dbname, unix_socket=socket, charset='utf8')
(table_count,) = get_table_count(get_conn, options.dbname, options.tablename)
print("\033[0;32m%s\033[0m" % "表的数量:%s" % table_count)
mode_description = '''
从以下选项中选择你要操作的DDL模式
[1] 在主从的master或者单实例执行DDL
(行数据会在主库读写)
(读取主库的binlog日志,将变更应用到主库的 xxx_gho表)
(在主库收集表格式,字段&索引,行数等信息)
(在主库上读取内部的变更事件(如心跳事件))
(在主库切换表)
[2] 主从环境,连接从库,读取binlog,在主库做DDL
(行数据在主库上读写)
(读取从库的二进制日志,将变更应用到主库)
(在从库收集表格式,字段&索引,行数等信息)
(在从库上读取内部的变更事件(如心跳事件))
(在主库切换表)
[3] 在从库上进行DDL测试:
输入 [选项]
'''
GH_OST_MODE = raw_input(mode_description)
print "You choose mode [%s]" % GH_OST_MODE
DDL_CMD = raw_input('Enter DDL CMD : ').replace('`', '')
gh_command_list = re.split('[ ]+', DDL_CMD)
if gh_command_list[0].upper() == 'CHANGE' and gh_command_list[1] != gh_command_list[2]:
print("\033[0;31m%s\033[0m" % "renamed columns' data will be lost,pt-osc exit...")
exit()
if table_count <= 10000:
ddl = ''' ALTER TABLE %s %s ''' % (options.tablename, DDL_CMD)
print("\033[0;36m%s\033[0m" % ddl)
print("\033[0;32m%s\033[0m" % "online ddl ...")
online_ddl(conn, ddl)
print("\033[0;32m%s\033[0m" % "执行完成 ...")
exit()
else:
MAX_LOAD = raw_input('Enter Max Threads_running【25】 : ')
if not MAX_LOAD:
Threads_running = 25
else:
try:
Threads_running = int(MAX_LOAD)
except ValueError:
print("\033[0;31m%s\033[0m" % "输入类型错误,退出...")
exit()
CHUNK_SIZE = raw_input('Enter Max chunk-size【1000】 : ')
if not CHUNK_SIZE:
chunk_size = 1000
else:
try:
chunk_size = int(CHUNK_SIZE)
except ValueError:
print("\033[0;31m%s\033[0m" % "输入类型错误,退出...")
exit()
print("\033[0;32m%s\033[0m" % "gh-ost ddl ...")
# --postpone-cut-over-flag-file=%s 有这个参数需要配合 echo "unpostpone" |nc -U /tmp/gh-ost.qdump.sbtest1.sock 使用来完成cut over 否则cut over 一直等待
# --allow-on-master \
#### 测试场景1 在master/单节点上做DDL
# 在master节点做的话需要加上这个参数,并指定连接方式为master的uri
#### 测试场景2 连接到从库 在主库做迁移(需要保证从库的binlog为ROW模式,主库不需要保证)
# 不加此参数表示默认在slave节点获取binlog,在slave节点做的话需要去掉这个参数,并指定连接方式为slave的uri
# 1、行数据在主库上读写
# 2、读取从库的二进制日志,将变更应用到主库
# 3、在从库收集表格式,字段&索引,行数等信息
# 4、在从库上读取内部的变更事件(如心跳事件)
# 5、在主库切换表
# 在执行DDL中,从库会执行一次stop/start slave,
# 要是确定从的binlog是ROW的话可以添加参数:--assume-rbr。
# 如果从库的binlog不是ROW,可以用参数--switch-to-rbr来转换成ROW,此时需要注意的是执行完毕之后,binlog模式不会被转换成原来的值。
# --assume-rbr和--switch-to-rbr参数不能一起使用。
#### 测试场景3 在从上进行DDL测试:
# 需添加参数--test-on-replica:
# 在从库上测试gh-ost,包括在从库上数据迁移(migration),数据迁移完成后stop slave,原表和ghost表立刻交换而后立刻交换回来。
# 继续保持stop slave,使你可以对比两张表。如果不想stop slave,则可以再添加参数:--test-on-replica-skip-replica-stop
# 上面三种是gh-ost操作模式,上面的操作中,到最后不会清理临时表,需要手动清理,再下次执行之前果然临时表还存在,则会执行失败,可以通过参数进行删除:
# --initially-drop-ghost-table:gh-ost操作之前,检查并删除已经存在的ghost表。该参数不建议使用,请手动处理原来存在的ghost表。默认不启用该参数,gh-ost直接退出操作。
# --initially-drop-old-table:gh-ost操作之前,检查并删除已经存在的旧表。该参数不建议使用,请手动处理原来存在的ghost表。默认不启用该参数,gh-ost直接退出操作。
# --initially-drop-socket-file:gh-ost强制删除已经存在的socket文件。该参数不建议使用,可能会删除一个正在运行的gh-ost程序,导致DDL失败。
# --ok-to-drop-table:gh-ost操作结束后,删除旧表,默认状态是不删除旧表,会存在_tablename_del表。
# 注意 可以多次执行测试 ddl
# 以上三个测试场景 均通过 场景3 执行完之后会stop slave 然后方便DBA对比 sbtest1 _sbtest1_gho的数据是否一致 用于测试
gh_command = getGHCMD(int(GH_OST_MODE))
print("\033[0;36m%s\033[0m" % gh_command)
child, pid = run_cmd(gh_command)
print("\033[0;31mgh-ost's PID:%s\033[0m" % pid)
print("\033[0;33m创建:【touch %s】文件,暂停DDL ...\033[0m" % throttle_flag)
try:
child.wait()
except:
child.terminate()
# clean
ghost_name_list = '_%s_ghc,_%s_gho' % (options.tablename, options.tablename)
drop_ghost_table(conn, ghost_name_list)
if os.path.exists(gh_ost_socket):
os.system('rm -r %s' % gh_ost_socket)
print("\033[0;32m%s\033[0m" % "清理完成 ...")
exit()
print("\033[0;32m%s\033[0m" % "清理完成 ...")
exit()
finally:
pass