本文主要研究一下kingbus的binlog_server_handler.go
kingbus/api/binlog_server_handler.go
//StartBinlogServer implements start a binlog server
func (h *BinlogServerHandler) StartBinlogServer(echoCtx echo.Context) error {
h.l.Lock()
defer h.l.Unlock()
var args config.BinlogServerConfig
var err error
defer func() {
if err != nil {
log.Log.Errorf("StartBinlogServer error,err: %s", err)
echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
}
}()
err = echoCtx.Bind(&args)
if err != nil {
return err
}
kingbusIP := h.svr.GetIP()
//check args
err = args.Check(kingbusIP)
if err != nil {
return err
}
//start syncer server
err = h.svr.StartServer(config.BinlogServerType, &args)
if err != nil {
log.Log.Errorf("start server error,err:%s,args:%v", err, args)
return err
}
return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(""))
}
复制代码
kingbus/server/server.go
//StartServer start sub servers:syncer server or binlog master server
func (s *KingbusServer) StartServer(svrType config.SubServerType, args interface{}) error {
var err error
switch svrType {
case config.SyncerServerType:
if s.IsSyncerStarted() {
return ErrStarted
}
syncerArgs, ok := args.(*config.SyncerArgs)
if !ok {
log.Log.Errorf("StartServer args is illegal,args:%v", args)
return ErrArgs
}
err = s.startSyncerServer(syncerArgs)
if err != nil {
log.Log.Errorf("startSyncerServer error,err:%s,args:%v", err, *syncerArgs)
return ErrArgs
}
//start to propose binlog event to raft cluster
s.StartProposeBinlog(s.syncer.ctx)
log.Log.Debugf("start syncer,and propose!!!")
return nil
case config.BinlogServerType:
if s.IsBinlogServerStarted() {
return ErrStarted
}
masterArgs, ok := args.(*config.BinlogServerConfig)
if !ok {
log.Log.Errorf("StartServer args is illegal,args:%v", args)
return ErrArgs
}
err = s.startMasterServer(masterArgs)
if err != nil {
log.Log.Errorf("startMasterServer error,err:%s,args:%v", err, *masterArgs)
return ErrArgs
}
return nil
default:
log.Log.Fatalf("StartServer:server type not support,serverType:%v", svrType)
}
return nil
}
复制代码
kingbus/api/binlog_server_handler.go
//StopBinlogServer implements stop binlog server
func (h *BinlogServerHandler) StopBinlogServer(echoCtx echo.Context) error {
h.l.Lock()
defer h.l.Unlock()
h.svr.StopServer(config.BinlogServerType)
return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(""))
}
复制代码
kingbus/server/server.go
//StopServer stop sub server
func (s *KingbusServer) StopServer(svrType config.SubServerType) {
switch svrType {
case config.SyncerServerType:
if s.IsSyncerStarted() {
s.syncer.Stop()
}
case config.BinlogServerType:
if s.IsBinlogServerStarted() {
s.master.Stop()
}
default:
log.Log.Fatalf("StopServer:server type not support,serverType:%v", svrType)
}
}
复制代码
kingbus/api/binlog_server_handler.go
//GetBinlogServerStatus implements get binlog server status in the runtime state
func (h *BinlogServerHandler) GetBinlogServerStatus(echoCtx echo.Context) error {
h.l.Lock()
defer h.l.Unlock()
status := h.svr.GetServerStatus(config.BinlogServerType)
if masterStatus, ok := status.(*config.BinlogServerStatus); ok {
return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(masterStatus))
}
return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError("no resp"))
}
复制代码
//GetServerStatus get the sub server status
func (s *KingbusServer) GetServerStatus(svrType config.SubServerType) interface{} {
switch svrType {
case config.SyncerServerType:
var syncerStatus config.SyncerStatus
if s.IsSyncerStarted() {
cfg := s.syncer.cfg
syncerStatus.MysqlAddr = fmt.Sprintf("%s:%d", cfg.Host, cfg.Port)
syncerStatus.MysqlUser = cfg.User
syncerStatus.MysqlPassword = cfg.Password
syncerStatus.SemiSync = cfg.SemiSyncEnabled
syncerStatus.Status = config.ServerRunningStatus
syncerStatus.CurrentGtid = s.CurrentGtidStr()
syncerStatus.LastBinlogFile = s.LastBinlogFile()
syncerStatus.LastFilePosition = s.LastFilePosition()
syncerStatus.ExecutedGtidSet = s.ExecutedGtidSetStr()
purgedGtids, err := s.store.GetGtidSet("mysql", storage.GtidPurgedKey)
if err != nil {
log.Log.Fatalf("get PurgedGtidSet error,err:%s", err)
}
syncerStatus.PurgedGtidSet = purgedGtids.String()
} else {
syncerStatus.Status = config.ServerStoppedStatus
}
return &syncerStatus
case config.BinlogServerType:
var status config.BinlogServerStatus
if s.IsBinlogServerStarted() {
cfg := s.master.cfg
status.Addr = cfg.Addr
status.User = cfg.User
status.Password = cfg.Password
status.Slaves = make([]*mysql.Slave, 0, 2)
slaves := s.master.GetSlaves()
for _, s := range slaves {
status.Slaves = append(status.Slaves, s)
}
status.CurrentGtid = s.CurrentGtidStr()
status.LastBinlogFile = s.LastBinlogFile()
status.LastFilePosition = s.LastFilePosition()
status.ExecutedGtidSet = s.ExecutedGtidSetStr()
purgedGtids, err := s.store.GetGtidSet("mysql", storage.GtidPurgedKey)
if err != nil {
log.Log.Fatalf("get PurgedGtidSet error,err:%s", err)
}
status.PurgedGtidSet = purgedGtids.String()
status.Status = config.ServerRunningStatus
} else {
status.Status = config.ServerStoppedStatus
}
return &status
default:
log.Log.Fatalf("StopServer:server type not support,serverType:%v", svrType)
}
return nil
}
复制代码
kingbus的binlog_server_handler提供了StartBinlogServer、StopBinlogServer、GetBinlogServerStatus,他们均委托给了server.go的对应方法