目录
我clone的是默认分支master
git clone https://github.com/influxdata/kapacitor.git
代码位置:/github.com/influxdata/kapacitor/cmd/kapacitord/main.go
// Run determines and runs the command specified by the CLI args.
func (m *Main) Run(args ...string) error {
// 解析命令行参数
name, args := ParseCommandName(args)
// Extract name from args.
switch name {
// 如果传入的参数位空或者为run
case "", "run":
// 新建一个run 命令
cmd := run.NewCommand()
// Tell the server the build details.
cmd.Version = version
cmd.Commit = commit
cmd.Branch = branch
cmd.Platform = "OSS"
// 执行命令
err := cmd.Run(args...)
...
// 如果是config
case "config":
...
// 执行config的run命令
if err := run.NewPrintConfigCommand().Run(args...); err != nil {
return fmt.Errorf("config: %s", err)
}
// 如果是version
case "version":
if err := NewVersionCommand().Run(args...); err != nil {
return fmt.Errorf("version: %s", err)
}
// 如果是help
case "help":
if err := help.NewCommand().Run(args...); err != nil {
return fmt.Errorf("help: %s", err)
}
default:
// 其他指令,不认识,报错
}
}
具体看下run指令的run()都干了啥
代码位置:/github.com/influxdata/kapacitor/cmd/kapacitord/run/command.go
// Run parses the config from args and runs the server.
func (cmd *Command) Run(args ...string) error {
// 解析命令行参数
...
// 打印kapacitor logo
...
// 查找并解析kapacitor配置文件
...
// 添加KAPACITOR前缀作为环境变量
...
// 校验配置文件里面的hostname,logFile, logLevel
...
// 初始化log service
...
// 初始化命令诊断(diagnostic)
...
// 在log里面标记已经启动
...
// 根据pidFile创建pid文件
...
// 根据配置文件创建server 实例
s, err := server.New(config, buildInfo, cmd.diagService)
if err != nil {
return fmt.Errorf("create server: %s", err)
}
...
// 启动server
if err := s.Open(); err != nil {
return fmt.Errorf("open server: %s", err)
}
...
// goroutine监控服务错误
// Begin monitoring the server's error channel.
go cmd.monitorServerErrors()
...
}
代码位置:/github.com/influxdata/kapacitor/server/server.go
// New returns a new instance of Server built from a config.
func New(c *Config, buildInfo BuildInfo, diagService *diagnostic.Service) (*Server, error) {
// 校验配置文件信息是否合法
...
...
// task master
s.TaskMasterLookup = kapacitor.NewTaskMasterLookup()
kd := diagService.NewKapacitorHandler()
s.TaskMaster = kapacitor.NewTaskMaster(kapacitor.MainTaskMaster, vars.Info, kd)
s.TaskMaster.DefaultRetentionPolicy = c.DefaultRetentionPolicy
s.TaskMaster.Commander = s.Commander
s.TaskMasterLookup.Set(s.TaskMaster)
// 启动task master
if err := s.TaskMaster.Open(); err != nil {
return nil, err
}
...
// 启动了TaskMaster后,就是各种服务的初始化
// =====kapacitor自己的服务============
// 根据配置文件初始化kapacitor服务
s.initHTTPDService()
// 添加存储服务
s.appendStorageService()
// 添加授权服务
s.appendAuthService()
// 添加config override服务(还不知道干啥的,只知道配置文件里有这么个[config-override]模块)
s.appendConfigOverrideService()
// 添加tester服务
s.appendTesterService()
// 添加sideload服务(不知干啥的)
s.appendSideloadService()
//===========告警服务=======
// 初始化告警服务
s.initAlertService()
// =========动态服务==========
// 添加udp服务
s.appendUDFService()
// 添加deadman服务
s.appendDeadmanService()
// 添加influxdb服务
if err := s.appendInfluxDBService(); err != nil {
return nil, errors.Wrap(err, "influxdb service")
}
// 添加load服务
if err := s.appendLoadService(); err != nil {
return nil, errors.Wrap(err, "load service")
}
// 后面还有一堆的服务
....
}
// influxdb服务
func (s *Server) appendInfluxDBService() error {
...
srv, err := influxdb.NewService(c, httpPort, s.config.Hostname, vars.Info, s.config.HTTP.AuthEnabled, d)
if err != nil {
return err
}
w, err := s.newClusterIDChangedWaiter()
if err != nil {
return err
}
srv.ClusterIDWaiter = w
srv.HTTPDService = s.HTTPDService
srv.PointsWriter = s.TaskMaster
srv.AuthService = s.AuthService
srv.ClientCreator = iclient.ClientCreator{}
s.InfluxDBService = srv
s.TaskMaster.InfluxDBService = srv
s.SetDynamicService("influxdb", srv)
s.AppendService("influxdb", srv)
return nil
}
代码位置:/github.com/influxdata/kapacitor/server/server.go
// Open opens all the services.
func (s *Server) Open() error {
// Start profiling, if set.
if err := s.startProfile(s.CPUProfile, s.MemProfile); err != nil {
return err
}
// 开启server里面的各项服务
if err := s.startServices(); err != nil {
s.Close()
return err
}
if err := s.LoadService.Load(); err != nil {
return fmt.Errorf("failed to reload tasks/templates/handlers: %v", err)
}
go s.watchServices()
go s.watchConfigUpdates()
return nil
}
代码位置:/github.com/influxdata/kapacitor/server/server.go
// 遍历各个service,调用service.open()这个是个接口,具体调用根据具体实现
func (s *Server) startServices() error {
for _, service := range s.Services {
s.Diag.Debug("opening service", keyvalue.KV("service", fmt.Sprintf("%T", service)))
// 调用抽象接口open(),开启对应的服务
if err := service.Open(); err != nil {
return fmt.Errorf("open service %T: %s", service, err)
}
s.Diag.Debug("opened service", keyvalue.KV("service", fmt.Sprintf("%T", service)))
// Apply config overrides after the config override service has been opened and before any dynamic services.
if service == s.ConfigOverrideService && !s.config.SkipConfigOverrides && s.config.ConfigOverride.Enabled {
// Apply initial config updates
s.Diag.Debug("applying config overrides")
configs, err := s.ConfigOverrideService.Config()
if err != nil {
return errors.Wrap(err, "failed to apply config overrides")
}
for service, config := range configs {
if srv, ok := s.DynamicServices[service]; !ok {
return fmt.Errorf("found configuration override for unknown service %q", service)
} else {
s.Diag.Debug("applying config overrides for service", keyvalue.KV("service", service))
if err := srv.Update(config); err != nil {
return errors.Wrapf(err, "failed to update configuration for service %s", service)
}
}
}
}
}
return nil
}
代码位置:/github.com/influxdata/kapacitor/services/influxdb/service.go
// 遍历influxdb service集群,把当前的influxdb service分配给cluster
func (s *Service) Open() error {
s.mu.Lock()
defer s.mu.Unlock()
if s.opened {
return nil
}
s.opened = true
for _, cluster := range s.clusters {
// 把当前服务分配到集群
s.assignServiceToCluster(cluster)
// 集群开启服务
if err := cluster.Open(); err != nil {
return err
}
}
// 使用post方式,调用/subscriptions路由,处理Subscription
// Define API routes
s.routes = []httpd.Route{
{
Method: "POST",
Pattern: subscriptionsPath,
// Refresh the subscriptions linking for all clusters.
HandlerFunc: s.handleSubscriptions,
},
}
// 添加路由
if err := s.HTTPDService.AddRoutes(s.routes); err != nil {
return errors.Wrap(err, "adding API routes")
}
// 回收已被移除的cluster的token
// Revoke any tokens for removed clusters.
if err := s.revokeClusterTokens(); err != nil {
return errors.Wrap(err, "revoking old cluster tokens")
}
if s.ClusterIDWaiter != nil {
s.wg.Add(1)
go func() {
defer s.wg.Done()
// 其实里面调用的是LinkSubscriptions()方法
s.waitForClusterIDChanges()
}()
}
return nil
}
// influxdb 集群开启服务
func (c *influxdbCluster) Open() error {
ctx, cancel := c.setupContext()
defer cancel()
c.mu.Lock()
defer c.mu.Unlock()
if c.opened {
return nil
}
c.opened = true
// 集群创建influxdb配置信息
if cli, err := c.ClientCreator.Create(c.influxdbConfig); err != nil {
return errors.Wrap(err, "failed to create client")
} else {
c.client = cli
}
// 监控订阅信息
c.watchSubs()
// 连接subscription,里面得到所有的数据库以及与数据库对应的保留策略,得到当前集群已存在的订阅信息
if err := c.linkSubscriptions(ctx, c.subName); err != nil {
return errors.Wrap(err, "failed to link subscription on startup")
}
return nil
}
代码位置:/github.com/influxdata/kapacitor/services/influxdb/service.go
// linkSubscriptions you must have the lock to call this method.
func (c *influxdbCluster) linkSubscriptions(ctx context.Context, subName string) error {
...
// 得到当前集群所有的数据库以及与数据库对应的保留策略
...
// 得到当前集群已存在的订阅信息
...
// 把新的订阅信息填充到已存在的订阅信息里面
...
// 创建和开启新的订阅信息
...
// token校验
...
// 关闭那些已经被drop的订阅
...
}
查看开启task master的方法
代码位置:/github.com/influxdata/kapacitor/task_master.go
func (tm *TaskMaster) Open() (err error) {
tm.mu.Lock()
defer tm.mu.Unlock()
if !tm.closed {
return ErrTaskMasterOpen
}
tm.closed = false
tm.drained = false
// kapacitor有stream和batch两种任务,这里用的是stream
tm.writePointsIn, err = tm.stream("write_points")
if err != nil {
tm.closed = true
return
}
tm.diag.TaskMasterOpened()
return
}
查看TaskMaster的stream()方法
代码位置:/github.com/influxdata/kapacitor/task_master.go
func (tm *TaskMaster) stream(name string) (StreamCollector, error) {
if tm.closed {
return nil, ErrTaskMasterClosed
}
d := tm.diag.WithEdgeContext(fmt.Sprintf("task_master:%s", tm.id), name, "stream")
// 新建一个Edge,第4个参数是需要一个pipiline.EdgeType,这里给的是stream
in := newEdge(fmt.Sprintf("task_master:%s", tm.id), name, "stream", pipeline.StreamEdge, defaultEdgeBufferSize, d)
se := &streamEdge{edge: in}
tm.wg.Add(1)
go func() {
defer tm.wg.Done()
// taskMaster运行
tm.runForking(se)
}()
return se, nil
}
// 运行taskMaster
func (tm *TaskMaster) runForking(in StreamEdge) {
for p, ok := in.EmitPoint(); ok; p, ok = in.EmitPoint() {
tm.forkPoint(p)
}
}
func (tm *TaskMaster) forkPoint(p edge.PointMessage) {
//
...
}
func (s *streamEdge) EmitPoint() (edge.PointMessage, bool) {
m, ok := s.edge.Emit()
if !ok {
return nil, false
}
p, ok := m.(edge.PointMessage)
if !ok {
panic("impossible to receive non PointMessage message")
}
return p, true
}