当前位置: 首页 > 工具软件 > Heka > 使用案例 >

heka 0.11.0源码分析--主要启动流程分析

钱承允
2023-12-01

heka主程序的启动源码为cmd/heka/main.go,首先来分析一下main函数源码如下:

主程序代码分析

func main() {
	exitCode := 0
	// `os.Exit` will skip any registered deferred functions, so to support
	// exit codes we put it in the first registerred deferred (i.e. the last to
	// run), we can set the exitCode and then call `return` to exit with an
	// error code.
	// 退出运行的函数
	defer func() {
		os.Exit(exitCode)
	}()
    // 从命令行中获取config配置文件,默认/etc/heka.toml
	configPath := flag.String("config", filepath.FromSlash("/etc/hekad.toml"),
		"Config file or directory. If directory is specified then all files "+
			"in the directory will be loaded.")
	version := flag.Bool("version", false, "Output version and exit")
	// 解析命令行参数
	flag.Parse()
    // heka公共配置类
	config := &HekadConfig{}
	var err error
	// cpu 分析文件名称
	var cpuProfName string
	// 内存分析文件名称
	var memProfName string
     
	if *version {
		fmt.Println(VERSION)
		return
	}
    // 加载heka公共参数,调用pipeline.ReplaceEnvsFile替换环境变量,并调用toml.Decode解析
	config, err = LoadHekadConfig(*configPath)
	if err != nil {
		pipeline.LogError.Println("Error reading config: ", err)
		exitCode = 1
		return
	}
	// 设置日志级别
	pipeline.LogInfo.SetFlags(config.LogFlags)
	pipeline.LogError.SetFlags(config.LogFlags)
	if config.SampleDenominator <= 0 {
		pipeline.LogError.Println("'sample_denominator' value must be greater than 0.")
		exitCode = 1
		return
	}
    // 解析最大空闲时间
	if _, err = time.ParseDuration(config.MaxPackIdle); err != nil {
		pipeline.LogError.Printf("Can't parse `max_pack_idle` time duration: %s\n",
			config.MaxPackIdle)
		exitCode = 1
		return
	}
    // 解析公共配置
	globals, cpuProfName, memProfName := setGlobalConfigs(config)
    // 生成跟目录
	if err = os.MkdirAll(globals.BaseDir, 0755); err != nil {
		pipeline.LogError.Printf("Error creating 'base_dir' %s: %s", config.BaseDir, err)
		exitCode = 1
		return
	}
    // 设置最大消息大小
	if config.MaxMessageSize > 1024 {
		message.SetMaxMessageSize(config.MaxMessageSize)
	} else if config.MaxMessageSize > 0 {
		pipeline.LogError.Println("Error: 'max_message_size' setting must be greater than 1024.")
		exitCode = 1
		return
	}
	// 进程文件
	if config.PidFile != "" {
	    // 读取进程号
		contents, err := ioutil.ReadFile(config.PidFile)
		if err == nil {
		    // 转换进程号 
			pid, err := strconv.Atoi(strings.TrimSpace(string(contents)))
			if err != nil {
				pipeline.LogError.Printf("Error reading proccess id from pidfile '%s': %s",
					config.PidFile, err)
				exitCode = 1
				return
			}
            // 根据进程号查找进程
			process, err := os.FindProcess(pid)

			// on Windows, err != nil if the process cannot be found
			if runtime.GOOS == "windows" {
				if err == nil {
					pipeline.LogError.Printf("Process %d is already running.", pid)
					exitCode = 1
					return
				}
			} else if process != nil {
				// err is always nil on POSIX, so we have to send the process
				// a signal to check whether it exists
				if err = process.Signal(syscall.Signal(0)); err == nil {
					pipeline.LogError.Printf("Process %d is already running.", pid)
					exitCode = 1
					return
				}
			}
		}
		// 把进程号写入到进程文件
		if err = ioutil.WriteFile(config.PidFile, []byte(strconv.Itoa(os.Getpid())),
			0644); err != nil {

			pipeline.LogError.Printf("Unable to write pidfile '%s': %s", config.PidFile, err)
			exitCode = 1
		}
		pipeline.LogInfo.Printf("Wrote pid to pidfile '%s'", config.PidFile)
		defer func() {
		    // 进程退出时,移除进程号记录文件
			if err = os.Remove(config.PidFile); err != nil {
				pipeline.LogError.Printf("Unable to remove pidfile '%s': %s", config.PidFile, err)
			}
		}()
	}

	if cpuProfName != "" {
	   // 创建cpu 分析文件
		profFile, err := os.Create(cpuProfName)
		if err != nil {
			pipeline.LogError.Println(err)
			exitCode = 1
			return
		}
        // 开启CPU分析,默认30s得到一个cpu分析文件
		pprof.StartCPUProfile(profFile)
		defer func() {
			pprof.StopCPUProfile()
			profFile.Close()
		}()
	}

	if memProfName != "" {
		defer func() {
		    // 创建内存分析文件
			profFile, err := os.Create(memProfName)
			if err != nil {
				pipeline.LogError.Fatalln(err)
			}
			// 退出时把堆信息写入到配置文件中
			pprof.WriteHeapProfile(profFile)
			profFile.Close()
		}()
	}

	// Set up and load the pipeline configuration and start the daemon.
	// 根据全局配置生成pipeline配置 NewPipelineConfig代码实现位于pipeline/config.go中
	pipeconf := pipeline.NewPipelineConfig(globals)
	// 加载配置文件或目录下所有配置
	if err = loadFullConfig(pipeconf, configPath); err != nil {
		pipeline.LogError.Println("Error reading config: ", err)
		exitCode = 1
		return
	}
	// 启动pipeline,代码位于pipeline/pipeline_runner.go(263行)
	exitCode = pipeline.Run(pipeconf)
}

PipelineConfig struct

在分析pipeline启动之前,首先要熟悉一下主目录/pipeline/config.go中PipelineConfig类

type PipelineConfig struct {
	// Heka 全局配置.
	Globals *GlobalConfigStruct
	// 按类别存放的New插件的工厂方法,主要类别有Input,Decoder,Filter,Encoder,Output,Splitter,用于生成对象用的
	makers map[string]map[string]PluginMaker
	// Direct access to makers["Decoder"] since it's needed by MultiDecoder
	// outside of the pipeline package.
	// 除非需要多解码器,一般使用makers["Decoder"]中的解码器
	DecoderMakers map[string]PluginMaker
	// Mutex protecting the makers map.
	// 读写锁,操作makers的map时使用
	makersLock sync.RWMutex
	// All running InputRunners, by name.
	// 输入input插件运行Runners
	InputRunners map[string]InputRunner
	// All running FilterRunners, by name.
	// Filter插件运行器Runner
	FilterRunners map[string]FilterRunner
	// All running OutputRunners, by name.
	// 输出Output插件运行器Runner
	OutputRunners map[string]OutputRunner
	// Heka message router instance.
	// 
	router *messageRouter
	// PipelinePack supply for Input plugins.
	inputRecycleChan chan *PipelinePack
	// PipelinePack supply for Filter plugins (separate pool prevents
	// deadlocks).
	injectRecycleChan chan *PipelinePack
	// Stores log messages generated by plugin config errors.
	LogMsgs []string
	// Lock protecting access to the set of running filters so dynamic filters
	// can be safely added and removed while Heka is running.
	// filters的读写锁
	filtersLock sync.RWMutex
	// Is freed when all FilterRunners have stopped.
	// 等待所有的filterRunners退出
	filtersWg sync.WaitGroup
	// Is freed when all DecoderRunners have stopped.
	// 等待所有的decoderRunners退出
	decodersWg sync.WaitGroup
	// Slice providing access to all running DecoderRunners.
	// 所有解码器
	allDecoders []DecoderRunner
	// Mutex protecting allDecoders.
	// 所有解码器读写锁
	allDecodersLock sync.RWMutex

	// Slice providing access to all Decoders called synchronously by InputRunner
	// 所有同步解码器
	allSyncDecoders []ReportingDecoder
	// Mutex protecting allSyncDecoders
	// 所有同步解码器读写锁
	allSyncDecodersLock sync.RWMutex

	// Slice providing access to all Splitters
	// 所有Spliter的切割或者说分割运行器
	allSplitters []SplitterRunner
	// Mutex protecting AllSplitters
	allSplittersLock sync.RWMutex

	// Slice providing access to all instantiated Encoders.
	// 所有的加密
	allEncoders map[string]Encoder
	// Mutex protecting allEncoders.
	allEncodersLock sync.RWMutex
	// Name of host on which Heka is running.
	// 当前主机IP
	hostname string
	// Heka process id.
	// 进程ID
	pid int32
	// Lock protecting access to the set of running inputs so they
	// can be safely added while Heka is running.
	// input的读写锁
	inputsLock sync.RWMutex
	// Is freed when all Input runners have stopped.
	// 等待所有input runners退出
	inputsWg sync.WaitGroup
	// Lock protecting access to running outputs so they can be removed
	// safely.
	// output读写锁
	outputsLock sync.RWMutex
	// Internal reporting channel.
	reportRecycleChan chan *PipelinePack

	// The next few values are used only during the initial configuration
	// loading process.

	// Track default plugin registration.
	defaultConfigs map[string]bool
	// Loaded PluginMakers sorted by category.
	makersByCategory map[string][]PluginMaker
	// Number of config loading errors.
	errcnt uint
}

PipelineConfig 方法

type PipelineConfig struct {
   // Pipeline message 添加时间戳,主机,进程号,传递次数
   PipelinePack(msgLoopCount uint) (*PipelinePack, error) 
   // 返回Message路由
   Router() MessageRouter
   // 返回当前inputRecycleChan管道
   InputRecycleChan() chan *PipelinePack
   // 返回当前injectRecycleChan管道
   InjectRecycleChan() chan *PipelinePack
   // 返回主机名
   Hostname() string
   // 返回Output插件的Runner
   Output(name string) (oRunner OutputRunner, ok bool)
   PipelineConfig() *PipelineConfig
   // 返回Decoder插件实例
   Decoder(name string) (decoder Decoder, ok bool)
   // 创建Decoder Runner实例,并starts启动
   DecoderRunner(baseName, fullName string)
   // 停止和unregisters注销DecoderRunner
   StopDecoderRunner(dRunner DecoderRunner) (ok bool)
   // 创建Encoder实例
   Encoder(baseName, fullName string) (Encoder, bool) 
   // 从Map中获取Filter
   Filter(name string) (fRunner FilterRunner, ok bool)
   // 根据InputRunner的名字获取StatAccumulator input plugin
   StatAccumulator(name string) (statAccum StatAccumulator,
	err error)
   // 添加FilterRunner到PipelineConfig
   AddFilterRunner(fRunner FilterRunner) error
   // 移除FilterRunner
   RemoveFilterRunner(name string) bool
   // 增加InputRunner
   AddInputRunner(iRunner InputRunner) error
   // 移除InputRunner
   RemoveInputRunner(iRunner InputRunner)
   // 移除OutputRunner
   RemoveOutputRunner(oRunner OutputRunner)
   // 注册默认的插件makers
   RegisterDefault(name string) error
   // 预先加载配置文件中的所有插件的配置
   PreloadFromConfigFile(filename string) error
   // 加载所有的没有预先加载默认插件
   LoadConfig() error
}

pipeline启动

主程序最后调用了pipeline的Run调用方法,此方法主要启动OutputRunner,FilterRunner,InputTracker,router,InputRunner等
主要启动步骤:
1. 启动OutputRunners
2. 启动FilterRunners
3. 启动inputTracker和injectTracker(跟踪有关)
4. 启动router(路由匹配器)
5. 启动InputRunners

代码位于pipeline/pipeline_runner.go(263行)

// Main function driving Heka execution. Loads config, initializes PipelinePack
// pools, and starts all the runners. Then it listens for signals and drives
// the shutdown process when that is triggered.
func Run(config *PipelineConfig) (exitCode int) {
	LogInfo.Println("Starting hekad...")
    // 用于等待所有的output退出
	var outputsWg sync.WaitGroup
	var err error

	globals := config.Globals
    // 遍历配置文件中所有的OutputRunners
	for name, output := range config.OutputRunners {
	    // 信号量+1,退出的时候-1,直到为0时 此类型所有插件退出
		outputsWg.Add(1)
		// 启动Output Runner的运行,调用Output Runner的Start方法 PipelineConfig实现了PluginHelper接口 
		if err = output.Start(config, &outputsWg); err != nil {
			LogError.Printf("Output '%s' failed to start: %s", name, err)
			outputsWg.Done()
			if !output.IsStoppable() {
				globals.ShutDown(1)
			}
			continue
		}
		LogInfo.Println("Output started:", name)
	}
    // 开启所有的Filter Runner
	for name, filter := range config.FilterRunners {
	    // 信号量+1,退出的时候-1,直到为0时 此类型所有插件退出
		config.filtersWg.Add(1)
		// 启动Filter  PipelineConfig实现了PluginHelper接口 
		if err = filter.Start(config, &config.filtersWg); err != nil {
			LogError.Printf("Filter '%s' failed to start: %s", name, err)
			config.filtersWg.Done()
			if !filter.IsStoppable() {
				globals.ShutDown(1)
			}
			continue
		}
		LogInfo.Println("Filter started:", name)
	}

	// Finish initializing the router's matchers.
	// 初始化路由的匹配器matchers
	config.router.initMatchSlices()

	// Setup the diagnostic trackers
	// 启动跟踪相关
	inputTracker := NewDiagnosticTracker("input", globals)
	injectTracker := NewDiagnosticTracker("inject", globals)

	// Create the report pipeline pack
	// 创建循环report的管道
	config.reportRecycleChan <- NewPipelinePack(config.reportRecycleChan)

	// Initialize all of the PipelinePacks that we'll need
	// 初始化所有的PipelinePack
	for i := 0; i < globals.PoolSize; i++ {
		inputPack := NewPipelinePack(config.inputRecycleChan)
		inputTracker.AddPack(inputPack)
		config.inputRecycleChan <- inputPack

		injectPack := NewPipelinePack(config.injectRecycleChan)
		injectTracker.AddPack(injectPack)
		config.injectRecycleChan <- injectPack
	}
    // 启动go程(子进程)
	go inputTracker.Run()
	go injectTracker.Run()
	// 启动路由
	config.router.Start()
    // 启动所有的InputRunner运行器,调用Input的Start方法启动
	for name, input := range config.InputRunners {
	    // 信号量+1,退出的时候-1,直到为0时 此类型所有插件退出
		config.inputsWg.Add(1)
		// 启动
		if err = input.Start(config, &config.inputsWg); err != nil {
			LogError.Printf("Input '%s' failed to start: %s", name, err)
			config.inputsWg.Done()
			if !input.IsStoppable() {
				globals.ShutDown(1)
			}
			continue
		}
		LogInfo.Println("Input started:", name)
	}

	// wait for sigint
	// 注册需要的信号
	signal.Notify(globals.sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP,
		SIGUSR1, SIGUSR2)
    // 
	for !globals.IsShuttingDown() {
		select {
		case sig := <-globals.sigChan:
			switch sig {
			case syscall.SIGHUP:
				LogInfo.Println("Reload initiated.")
				if err := notify.Post(RELOAD, nil); err != nil {
					LogError.Println("Error sending reload event: ", err)
				}
			case syscall.SIGINT, syscall.SIGTERM:
			    // 中断退出
				LogInfo.Println("Shutdown initiated.")
				globals.stop()
			case SIGUSR1:
			    // 查询report报表输出到stdout
				LogInfo.Println("Queue report initiated.")
				go config.allReportsStdout()
			case SIGUSR2:
				LogInfo.Println("Sandbox abort initiated.")
				go sandboxAbort(config)
			}
		}
	}
    // 退出流程和启动流程刚好相反
	config.inputsLock.Lock()
	// 退出所有的InputRunners 调用Input的Stop方法
	for _, input := range config.InputRunners {
		input.Input().Stop()
		LogInfo.Printf("Stop message sent to input '%s'", input.Name())
	}
	config.inputsLock.Unlock()
	config.inputsWg.Wait()

	config.allDecodersLock.Lock()
	LogInfo.Println("Waiting for decoders shutdown")
	// 退出所有的decoders解码器
	for _, decoder := range config.allDecoders {
		close(decoder.InChan())
		LogInfo.Printf("Stop message sent to decoder '%s'", decoder.Name())
	}
	// 移除所有的解码器
	config.allDecoders = config.allDecoders[:0]
	config.allDecodersLock.Unlock()
	config.decodersWg.Wait()
	LogInfo.Println("Decoders shutdown complete")

	config.filtersLock.Lock()
	// 退出所有的Filter过滤器
	for _, filter := range config.FilterRunners {
		// needed for a clean shutdown without deadlocking or orphaning messages
		// 1. removes the matcher from the router
		// 2. closes the matcher input channel and lets it drain
		// 3. closes the filter input channel and lets it drain
		// 4. exits the filter
		config.router.RemoveFilterMatcher() <- filter.MatchRunner()
		LogInfo.Printf("Stop message sent to filter '%s'", filter.Name())
	}
	config.filtersLock.Unlock()
	config.filtersWg.Wait()
    // 退出所有的output运行器,调用MatchRunner停止并返回 移除相关的Output Matcher
	for _, output := range config.OutputRunners {
		config.router.RemoveOutputMatcher() <- output.MatchRunner()
		LogInfo.Printf("Stop message sent to output '%s'", output.Name())
	}
	outputsWg.Wait()
    // 停止所有的序列化Encoder
	for name, encoder := range config.allEncoders {
		if stopper, ok := encoder.(NeedsStopping); ok {
			LogInfo.Printf("Stopping encoder '%s'", name)
			stopper.Stop()
		}
	}

	LogInfo.Println("Shutdown complete.")
	return globals.exitCode
}

OutputRunner, FilterRunner ,InputRunner等接口实现

type iRunner struct 实现了InputRunner接口,type foRunner struct同时实现了OutputRunner和FilterRunner接口,主要代码在pipeline/plugin_runners.go文件中。此节写到这里,后续编写具体Input,Filter,Output插件的加载。

 类似资料: