当前位置: 首页 > 工具软件 > gh > 使用案例 >

gh-ost封装脚本

欧阳骏俊
2023-12-01

针对gh-ost封装了一个使用脚本,支持gh-ost的三种使用场景

#!/bin/env python
# -*- encoding: utf-8 -*-
# ----------------------------------------------
# Purpose:         gh-ost
# Created:         2018-06-16
# Modified:        hao.chen@woqutech.com
# Modified date:   2022-01-06
# ----------------------------------------------

import MySQLdb
import re
import sys
import time
import subprocess
import os
from optparse import OptionParser


def calc_time(func):
    def _deco(*args, **kwargs):
        begin_time = time.time()
        func(*args, **kwargs)
        cost_time = time.time() - begin_time
        print
        'cost time: %ss' % round(cost_time, 2)

    return _deco


def get_table_count(conn, dbname, tbname):
    query = ''' SELECT count(*) FROM %s.%s ''' % (dbname, tbname)
    cursor = conn.cursor()
    cursor.execute(query)
    row_nums = cursor.fetchone()
    cursor.close()
    conn.close()
    return row_nums


def online_ddl(conn, ddl_cmd):
    cursor = conn.cursor()
    cursor.execute(ddl_cmd)
    conn.commit()
    cursor.close()
    conn.close()


# @calc_time
def run_cmd(cmd):
    p = subprocess.Popen(cmd, shell=True)
    return p, p.pid


def drop_ghost_table(conn, ghost_name_list):
    try:
        cursor = conn.cursor()
        query = ''' DROP TABLE IF EXISTS %s; ''' % (ghost_name_list)
        cursor.execute(query)
        conn.commit()
        cursor.close()
        conn.close()
    except Exception, e:
        print
        e


def getGHCMD(mode):
    if mode == 1:
        print "...."
        cmd = '''gh-ost \
--user="%s" \
--password="%s" \
--host=%s \
--port=%s --database="%s" --table="%s" \
--allow-on-master \
--max-load='Threads_running=%d' \
--chunk-size=%d \
--serve-socket-file=%s \
--panic-flag-file=%s \
--throttle-additional-flag-file=%s \
--alter="%s" \
--heartbeat-interval-millis=2000  \
--initially-drop-ghost-table  \
--initially-drop-old-table \
--cut-over=default \
--cut-over-lock-timeout-seconds=1 \
--assume-rbr \
--concurrent-rowcount \
--default-retries=10 \
--postpone-cut-over-flag-file=/tmp/ghost.postpone.flag \
--execute | tee  rebuild_t1.log''' % (

            options.mysqluser, options.mysqlpassword, options.mysqlhost, options.port, options.dbname,
            options.tablename, Threads_running, chunk_size, gh_ost_socket, panic_flag,
            throttle_flag, DDL_CMD)
        return cmd
    elif mode == 2:
        cmd = '''gh-ost \
--user="%s" \
--password="%s" \
--host=%s \
--port=%s --database="%s" --table="%s" \
--max-load='Threads_running=%d' \
--chunk-size=%d \
--serve-socket-file=%s \
--panic-flag-file=%s \
--throttle-additional-flag-file=%s \
--alter="%s" \
--heartbeat-interval-millis=2000  \
--initially-drop-ghost-table  \
--initially-drop-old-table \
--cut-over=default \
--cut-over-lock-timeout-seconds=1 \
--assume-rbr \
--concurrent-rowcount \
--default-retries=10 \
--postpone-cut-over-flag-file=/tmp/ghost.postpone.flag \
--execute | tee  rebuild_t1.log''' % (
            options.mysqluser, options.mysqlpassword, options.mysqlhost, options.port, options.dbname,
            options.tablename, Threads_running, chunk_size, gh_ost_socket, panic_flag,
            throttle_flag, DDL_CMD)
        return cmd
    elif mode == 3:
        cmd = '''gh-ost \
--user="%s" \
--password="%s" \
--host=%s \
--port=%s --database="%s" --table="%s" \
--max-load='Threads_running=%d' \
--chunk-size=%d \
--serve-socket-file=%s \
--panic-flag-file=%s \
--throttle-additional-flag-file=%s \
--alter="%s" \
--heartbeat-interval-millis=2000  \
--initially-drop-ghost-table  \
--initially-drop-old-table \
--cut-over=default \
--cut-over-lock-timeout-seconds=1 \
--assume-rbr \
--concurrent-rowcount \
--default-retries=10 \
--test-on-replica \
--postpone-cut-over-flag-file=/tmp/ghost.postpone.flag \
--execute | tee  rebuild_t1.log''' % (
            options.mysqluser, options.mysqlpassword, options.mysqlhost, options.port, options.dbname,
            options.tablename, Threads_running, chunk_size, gh_ost_socket, panic_flag,
            throttle_flag, DDL_CMD)
        return cmd



if __name__ == "__main__":
    parser = OptionParser()
    parser.add_option("-P", "--Port", help="Port for search", dest="port")
    parser.add_option("-D", "--Dbname", help="the Dbname to use", dest="dbname")
    parser.add_option("-T", "--Table", help="the Table to use", dest="tablename")
    parser.add_option("-u", "--user", help="mysql user", dest="mysqluser")
    parser.add_option("-p", "--password", help="mysql user password", dest="mysqlpassword")
    parser.add_option("-H", "--host", help="mysql host", dest="mysqlhost")

    (options, args) = parser.parse_args()

    if not options.port:
        print
        'params port need to apply'
        exit()

    if not options.dbname:
        print
        'params dbname need to apply'
        exit()

    if not options.tablename:
        print
        'params tablename need to apply'
        exit()

    gh_ost_socket = '/tmp/gh-ost.%s.%s.sock' % (options.dbname, options.tablename)
    # 终止标志
    panic_flag = '/tmp/gh-ost.panic.%s.%s.flag' % (options.dbname, options.tablename)
    # postpone_flag   =  '/tmp/gh-ost.postpone.%s.%s.flag' %(options.dbname,options.tablename)
    # 暂停标志
    throttle_flag = '/tmp/gh-ost.throttle.%s.%s' % (options.dbname, options.tablename)
    #    socket = '/data/%s/tmp/mysql.sock' %(options.port)
    socket = '/var/run/mysqld/mysqld.sock'

    get_conn = MySQLdb.connect(host=options.mysqlhost, port=int(options.port), user=options.mysqluser,
                               passwd=options.mysqlpassword,
                               db=options.dbname, unix_socket=socket, charset='utf8')
    conn = MySQLdb.connect(host=options.mysqlhost, port=int(options.port), user=options.mysqluser,
                           passwd=options.mysqlpassword,
                           db=options.dbname, unix_socket=socket, charset='utf8')

    (table_count,) = get_table_count(get_conn, options.dbname, options.tablename)
    print("\033[0;32m%s\033[0m" % "表的数量:%s" % table_count)

    mode_description = '''
从以下选项中选择你要操作的DDL模式
[1] 在主从的master或者单实例执行DDL 
    (行数据会在主库读写)
    (读取主库的binlog日志,将变更应用到主库的 xxx_gho表)
    (在主库收集表格式,字段&索引,行数等信息)
    (在主库上读取内部的变更事件(如心跳事件))
    (在主库切换表)
[2] 主从环境,连接从库,读取binlog,在主库做DDL
    (行数据在主库上读写)
    (读取从库的二进制日志,将变更应用到主库)
    (在从库收集表格式,字段&索引,行数等信息)
    (在从库上读取内部的变更事件(如心跳事件))
    (在主库切换表)
[3] 在从库上进行DDL测试:
输入 [选项]
    '''
    GH_OST_MODE = raw_input(mode_description)
    print "You choose mode [%s]" % GH_OST_MODE
    DDL_CMD = raw_input('Enter DDL CMD   : ').replace('`', '')

    gh_command_list = re.split('[ ]+', DDL_CMD)
    if gh_command_list[0].upper() == 'CHANGE' and gh_command_list[1] != gh_command_list[2]:
        print("\033[0;31m%s\033[0m" % "renamed columns' data will be lost,pt-osc exit...")
        exit()

    if table_count <= 10000:
        ddl = ''' ALTER TABLE %s %s ''' % (options.tablename, DDL_CMD)
        print("\033[0;36m%s\033[0m" % ddl)
        print("\033[0;32m%s\033[0m" % "online ddl ...")
        online_ddl(conn, ddl)
        print("\033[0;32m%s\033[0m" % "执行完成 ...")
        exit()

    else:
        MAX_LOAD = raw_input('Enter Max Threads_running【25】 : ')
        if not MAX_LOAD:
            Threads_running = 25
        else:
            try:
                Threads_running = int(MAX_LOAD)
            except ValueError:
                print("\033[0;31m%s\033[0m" % "输入类型错误,退出...")
                exit()

        CHUNK_SIZE = raw_input('Enter Max chunk-size【1000】    : ')
        if not CHUNK_SIZE:
            chunk_size = 1000
        else:
            try:
                chunk_size = int(CHUNK_SIZE)
            except ValueError:
                print("\033[0;31m%s\033[0m" % "输入类型错误,退出...")
                exit()

        print("\033[0;32m%s\033[0m" % "gh-ost ddl ...")
        # --postpone-cut-over-flag-file=%s  有这个参数需要配合 echo "unpostpone" |nc -U /tmp/gh-ost.qdump.sbtest1.sock 使用来完成cut over 否则cut over 一直等待

        # --allow-on-master \ 
        #### 测试场景1 在master/单节点上做DDL
        #    在master节点做的话需要加上这个参数,并指定连接方式为master的uri    

        #### 测试场景2 连接到从库 在主库做迁移(需要保证从库的binlog为ROW模式,主库不需要保证)
        #    不加此参数表示默认在slave节点获取binlog,在slave节点做的话需要去掉这个参数,并指定连接方式为slave的uri
        #       1、行数据在主库上读写
        #       2、读取从库的二进制日志,将变更应用到主库
        #       3、在从库收集表格式,字段&索引,行数等信息
        #       4、在从库上读取内部的变更事件(如心跳事件)
        #       5、在主库切换表
        #  在执行DDL中,从库会执行一次stop/start slave,
        #  要是确定从的binlog是ROW的话可以添加参数:--assume-rbr。
        #  如果从库的binlog不是ROW,可以用参数--switch-to-rbr来转换成ROW,此时需要注意的是执行完毕之后,binlog模式不会被转换成原来的值。
        #  --assume-rbr和--switch-to-rbr参数不能一起使用。

        #### 测试场景3 在从上进行DDL测试:
        # 需添加参数--test-on-replica:
        # 在从库上测试gh-ost,包括在从库上数据迁移(migration),数据迁移完成后stop slave,原表和ghost表立刻交换而后立刻交换回来。
        # 继续保持stop slave,使你可以对比两张表。如果不想stop slave,则可以再添加参数:--test-on-replica-skip-replica-stop
        # 上面三种是gh-ost操作模式,上面的操作中,到最后不会清理临时表,需要手动清理,再下次执行之前果然临时表还存在,则会执行失败,可以通过参数进行删除:
        # --initially-drop-ghost-table:gh-ost操作之前,检查并删除已经存在的ghost表。该参数不建议使用,请手动处理原来存在的ghost表。默认不启用该参数,gh-ost直接退出操作。
        # --initially-drop-old-table:gh-ost操作之前,检查并删除已经存在的旧表。该参数不建议使用,请手动处理原来存在的ghost表。默认不启用该参数,gh-ost直接退出操作。
        # --initially-drop-socket-file:gh-ost强制删除已经存在的socket文件。该参数不建议使用,可能会删除一个正在运行的gh-ost程序,导致DDL失败。
        # --ok-to-drop-table:gh-ost操作结束后,删除旧表,默认状态是不删除旧表,会存在_tablename_del表。
        # 注意 可以多次执行测试 ddl

        # 以上三个测试场景 均通过 场景3 执行完之后会stop slave 然后方便DBA对比 sbtest1 _sbtest1_gho的数据是否一致 用于测试
        gh_command = getGHCMD(int(GH_OST_MODE))
        print("\033[0;36m%s\033[0m" % gh_command)

        child, pid = run_cmd(gh_command)
        print("\033[0;31mgh-ost's PID:%s\033[0m" % pid)
        print("\033[0;33m创建:【touch %s】文件,暂停DDL ...\033[0m" % throttle_flag)
        try:
            child.wait()
        except:
            child.terminate()
            # clean
            ghost_name_list = '_%s_ghc,_%s_gho' % (options.tablename, options.tablename)
            drop_ghost_table(conn, ghost_name_list)
            if os.path.exists(gh_ost_socket):
                os.system('rm -r %s' % gh_ost_socket)
                print("\033[0;32m%s\033[0m" % "清理完成 ...")
                exit()
            print("\033[0;32m%s\033[0m" % "清理完成 ...")
            exit()
        finally:
            pass

 类似资料: