heka主程序的启动源码为cmd/heka/main.go,首先来分析一下main函数源码如下:
func main() {
exitCode := 0
// `os.Exit` will skip any registered deferred functions, so to support
// exit codes we put it in the first registerred deferred (i.e. the last to
// run), we can set the exitCode and then call `return` to exit with an
// error code.
// 退出运行的函数
defer func() {
os.Exit(exitCode)
}()
// 从命令行中获取config配置文件,默认/etc/heka.toml
configPath := flag.String("config", filepath.FromSlash("/etc/hekad.toml"),
"Config file or directory. If directory is specified then all files "+
"in the directory will be loaded.")
version := flag.Bool("version", false, "Output version and exit")
// 解析命令行参数
flag.Parse()
// heka公共配置类
config := &HekadConfig{}
var err error
// cpu 分析文件名称
var cpuProfName string
// 内存分析文件名称
var memProfName string
if *version {
fmt.Println(VERSION)
return
}
// 加载heka公共参数,调用pipeline.ReplaceEnvsFile替换环境变量,并调用toml.Decode解析
config, err = LoadHekadConfig(*configPath)
if err != nil {
pipeline.LogError.Println("Error reading config: ", err)
exitCode = 1
return
}
// 设置日志级别
pipeline.LogInfo.SetFlags(config.LogFlags)
pipeline.LogError.SetFlags(config.LogFlags)
if config.SampleDenominator <= 0 {
pipeline.LogError.Println("'sample_denominator' value must be greater than 0.")
exitCode = 1
return
}
// 解析最大空闲时间
if _, err = time.ParseDuration(config.MaxPackIdle); err != nil {
pipeline.LogError.Printf("Can't parse `max_pack_idle` time duration: %s\n",
config.MaxPackIdle)
exitCode = 1
return
}
// 解析公共配置
globals, cpuProfName, memProfName := setGlobalConfigs(config)
// 生成跟目录
if err = os.MkdirAll(globals.BaseDir, 0755); err != nil {
pipeline.LogError.Printf("Error creating 'base_dir' %s: %s", config.BaseDir, err)
exitCode = 1
return
}
// 设置最大消息大小
if config.MaxMessageSize > 1024 {
message.SetMaxMessageSize(config.MaxMessageSize)
} else if config.MaxMessageSize > 0 {
pipeline.LogError.Println("Error: 'max_message_size' setting must be greater than 1024.")
exitCode = 1
return
}
// 进程文件
if config.PidFile != "" {
// 读取进程号
contents, err := ioutil.ReadFile(config.PidFile)
if err == nil {
// 转换进程号
pid, err := strconv.Atoi(strings.TrimSpace(string(contents)))
if err != nil {
pipeline.LogError.Printf("Error reading proccess id from pidfile '%s': %s",
config.PidFile, err)
exitCode = 1
return
}
// 根据进程号查找进程
process, err := os.FindProcess(pid)
// on Windows, err != nil if the process cannot be found
if runtime.GOOS == "windows" {
if err == nil {
pipeline.LogError.Printf("Process %d is already running.", pid)
exitCode = 1
return
}
} else if process != nil {
// err is always nil on POSIX, so we have to send the process
// a signal to check whether it exists
if err = process.Signal(syscall.Signal(0)); err == nil {
pipeline.LogError.Printf("Process %d is already running.", pid)
exitCode = 1
return
}
}
}
// 把进程号写入到进程文件
if err = ioutil.WriteFile(config.PidFile, []byte(strconv.Itoa(os.Getpid())),
0644); err != nil {
pipeline.LogError.Printf("Unable to write pidfile '%s': %s", config.PidFile, err)
exitCode = 1
}
pipeline.LogInfo.Printf("Wrote pid to pidfile '%s'", config.PidFile)
defer func() {
// 进程退出时,移除进程号记录文件
if err = os.Remove(config.PidFile); err != nil {
pipeline.LogError.Printf("Unable to remove pidfile '%s': %s", config.PidFile, err)
}
}()
}
if cpuProfName != "" {
// 创建cpu 分析文件
profFile, err := os.Create(cpuProfName)
if err != nil {
pipeline.LogError.Println(err)
exitCode = 1
return
}
// 开启CPU分析,默认30s得到一个cpu分析文件
pprof.StartCPUProfile(profFile)
defer func() {
pprof.StopCPUProfile()
profFile.Close()
}()
}
if memProfName != "" {
defer func() {
// 创建内存分析文件
profFile, err := os.Create(memProfName)
if err != nil {
pipeline.LogError.Fatalln(err)
}
// 退出时把堆信息写入到配置文件中
pprof.WriteHeapProfile(profFile)
profFile.Close()
}()
}
// Set up and load the pipeline configuration and start the daemon.
// 根据全局配置生成pipeline配置 NewPipelineConfig代码实现位于pipeline/config.go中
pipeconf := pipeline.NewPipelineConfig(globals)
// 加载配置文件或目录下所有配置
if err = loadFullConfig(pipeconf, configPath); err != nil {
pipeline.LogError.Println("Error reading config: ", err)
exitCode = 1
return
}
// 启动pipeline,代码位于pipeline/pipeline_runner.go(263行)
exitCode = pipeline.Run(pipeconf)
}
在分析pipeline启动之前,首先要熟悉一下主目录/pipeline/config.go中PipelineConfig类
type PipelineConfig struct {
// Heka 全局配置.
Globals *GlobalConfigStruct
// 按类别存放的New插件的工厂方法,主要类别有Input,Decoder,Filter,Encoder,Output,Splitter,用于生成对象用的
makers map[string]map[string]PluginMaker
// Direct access to makers["Decoder"] since it's needed by MultiDecoder
// outside of the pipeline package.
// 除非需要多解码器,一般使用makers["Decoder"]中的解码器
DecoderMakers map[string]PluginMaker
// Mutex protecting the makers map.
// 读写锁,操作makers的map时使用
makersLock sync.RWMutex
// All running InputRunners, by name.
// 输入input插件运行Runners
InputRunners map[string]InputRunner
// All running FilterRunners, by name.
// Filter插件运行器Runner
FilterRunners map[string]FilterRunner
// All running OutputRunners, by name.
// 输出Output插件运行器Runner
OutputRunners map[string]OutputRunner
// Heka message router instance.
//
router *messageRouter
// PipelinePack supply for Input plugins.
inputRecycleChan chan *PipelinePack
// PipelinePack supply for Filter plugins (separate pool prevents
// deadlocks).
injectRecycleChan chan *PipelinePack
// Stores log messages generated by plugin config errors.
LogMsgs []string
// Lock protecting access to the set of running filters so dynamic filters
// can be safely added and removed while Heka is running.
// filters的读写锁
filtersLock sync.RWMutex
// Is freed when all FilterRunners have stopped.
// 等待所有的filterRunners退出
filtersWg sync.WaitGroup
// Is freed when all DecoderRunners have stopped.
// 等待所有的decoderRunners退出
decodersWg sync.WaitGroup
// Slice providing access to all running DecoderRunners.
// 所有解码器
allDecoders []DecoderRunner
// Mutex protecting allDecoders.
// 所有解码器读写锁
allDecodersLock sync.RWMutex
// Slice providing access to all Decoders called synchronously by InputRunner
// 所有同步解码器
allSyncDecoders []ReportingDecoder
// Mutex protecting allSyncDecoders
// 所有同步解码器读写锁
allSyncDecodersLock sync.RWMutex
// Slice providing access to all Splitters
// 所有Spliter的切割或者说分割运行器
allSplitters []SplitterRunner
// Mutex protecting AllSplitters
allSplittersLock sync.RWMutex
// Slice providing access to all instantiated Encoders.
// 所有的加密
allEncoders map[string]Encoder
// Mutex protecting allEncoders.
allEncodersLock sync.RWMutex
// Name of host on which Heka is running.
// 当前主机IP
hostname string
// Heka process id.
// 进程ID
pid int32
// Lock protecting access to the set of running inputs so they
// can be safely added while Heka is running.
// input的读写锁
inputsLock sync.RWMutex
// Is freed when all Input runners have stopped.
// 等待所有input runners退出
inputsWg sync.WaitGroup
// Lock protecting access to running outputs so they can be removed
// safely.
// output读写锁
outputsLock sync.RWMutex
// Internal reporting channel.
reportRecycleChan chan *PipelinePack
// The next few values are used only during the initial configuration
// loading process.
// Track default plugin registration.
defaultConfigs map[string]bool
// Loaded PluginMakers sorted by category.
makersByCategory map[string][]PluginMaker
// Number of config loading errors.
errcnt uint
}
type PipelineConfig struct {
// Pipeline message 添加时间戳,主机,进程号,传递次数
PipelinePack(msgLoopCount uint) (*PipelinePack, error)
// 返回Message路由
Router() MessageRouter
// 返回当前inputRecycleChan管道
InputRecycleChan() chan *PipelinePack
// 返回当前injectRecycleChan管道
InjectRecycleChan() chan *PipelinePack
// 返回主机名
Hostname() string
// 返回Output插件的Runner
Output(name string) (oRunner OutputRunner, ok bool)
PipelineConfig() *PipelineConfig
// 返回Decoder插件实例
Decoder(name string) (decoder Decoder, ok bool)
// 创建Decoder Runner实例,并starts启动
DecoderRunner(baseName, fullName string)
// 停止和unregisters注销DecoderRunner
StopDecoderRunner(dRunner DecoderRunner) (ok bool)
// 创建Encoder实例
Encoder(baseName, fullName string) (Encoder, bool)
// 从Map中获取Filter
Filter(name string) (fRunner FilterRunner, ok bool)
// 根据InputRunner的名字获取StatAccumulator input plugin
StatAccumulator(name string) (statAccum StatAccumulator,
err error)
// 添加FilterRunner到PipelineConfig
AddFilterRunner(fRunner FilterRunner) error
// 移除FilterRunner
RemoveFilterRunner(name string) bool
// 增加InputRunner
AddInputRunner(iRunner InputRunner) error
// 移除InputRunner
RemoveInputRunner(iRunner InputRunner)
// 移除OutputRunner
RemoveOutputRunner(oRunner OutputRunner)
// 注册默认的插件makers
RegisterDefault(name string) error
// 预先加载配置文件中的所有插件的配置
PreloadFromConfigFile(filename string) error
// 加载所有的没有预先加载默认插件
LoadConfig() error
}
主程序最后调用了pipeline的Run调用方法,此方法主要启动OutputRunner,FilterRunner,InputTracker,router,InputRunner等
主要启动步骤:
1. 启动OutputRunners
2. 启动FilterRunners
3. 启动inputTracker和injectTracker(跟踪有关)
4. 启动router(路由匹配器)
5. 启动InputRunners
代码位于pipeline/pipeline_runner.go(263行)
// Main function driving Heka execution. Loads config, initializes PipelinePack
// pools, and starts all the runners. Then it listens for signals and drives
// the shutdown process when that is triggered.
func Run(config *PipelineConfig) (exitCode int) {
LogInfo.Println("Starting hekad...")
// 用于等待所有的output退出
var outputsWg sync.WaitGroup
var err error
globals := config.Globals
// 遍历配置文件中所有的OutputRunners
for name, output := range config.OutputRunners {
// 信号量+1,退出的时候-1,直到为0时 此类型所有插件退出
outputsWg.Add(1)
// 启动Output Runner的运行,调用Output Runner的Start方法 PipelineConfig实现了PluginHelper接口
if err = output.Start(config, &outputsWg); err != nil {
LogError.Printf("Output '%s' failed to start: %s", name, err)
outputsWg.Done()
if !output.IsStoppable() {
globals.ShutDown(1)
}
continue
}
LogInfo.Println("Output started:", name)
}
// 开启所有的Filter Runner
for name, filter := range config.FilterRunners {
// 信号量+1,退出的时候-1,直到为0时 此类型所有插件退出
config.filtersWg.Add(1)
// 启动Filter PipelineConfig实现了PluginHelper接口
if err = filter.Start(config, &config.filtersWg); err != nil {
LogError.Printf("Filter '%s' failed to start: %s", name, err)
config.filtersWg.Done()
if !filter.IsStoppable() {
globals.ShutDown(1)
}
continue
}
LogInfo.Println("Filter started:", name)
}
// Finish initializing the router's matchers.
// 初始化路由的匹配器matchers
config.router.initMatchSlices()
// Setup the diagnostic trackers
// 启动跟踪相关
inputTracker := NewDiagnosticTracker("input", globals)
injectTracker := NewDiagnosticTracker("inject", globals)
// Create the report pipeline pack
// 创建循环report的管道
config.reportRecycleChan <- NewPipelinePack(config.reportRecycleChan)
// Initialize all of the PipelinePacks that we'll need
// 初始化所有的PipelinePack
for i := 0; i < globals.PoolSize; i++ {
inputPack := NewPipelinePack(config.inputRecycleChan)
inputTracker.AddPack(inputPack)
config.inputRecycleChan <- inputPack
injectPack := NewPipelinePack(config.injectRecycleChan)
injectTracker.AddPack(injectPack)
config.injectRecycleChan <- injectPack
}
// 启动go程(子进程)
go inputTracker.Run()
go injectTracker.Run()
// 启动路由
config.router.Start()
// 启动所有的InputRunner运行器,调用Input的Start方法启动
for name, input := range config.InputRunners {
// 信号量+1,退出的时候-1,直到为0时 此类型所有插件退出
config.inputsWg.Add(1)
// 启动
if err = input.Start(config, &config.inputsWg); err != nil {
LogError.Printf("Input '%s' failed to start: %s", name, err)
config.inputsWg.Done()
if !input.IsStoppable() {
globals.ShutDown(1)
}
continue
}
LogInfo.Println("Input started:", name)
}
// wait for sigint
// 注册需要的信号
signal.Notify(globals.sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP,
SIGUSR1, SIGUSR2)
//
for !globals.IsShuttingDown() {
select {
case sig := <-globals.sigChan:
switch sig {
case syscall.SIGHUP:
LogInfo.Println("Reload initiated.")
if err := notify.Post(RELOAD, nil); err != nil {
LogError.Println("Error sending reload event: ", err)
}
case syscall.SIGINT, syscall.SIGTERM:
// 中断退出
LogInfo.Println("Shutdown initiated.")
globals.stop()
case SIGUSR1:
// 查询report报表输出到stdout
LogInfo.Println("Queue report initiated.")
go config.allReportsStdout()
case SIGUSR2:
LogInfo.Println("Sandbox abort initiated.")
go sandboxAbort(config)
}
}
}
// 退出流程和启动流程刚好相反
config.inputsLock.Lock()
// 退出所有的InputRunners 调用Input的Stop方法
for _, input := range config.InputRunners {
input.Input().Stop()
LogInfo.Printf("Stop message sent to input '%s'", input.Name())
}
config.inputsLock.Unlock()
config.inputsWg.Wait()
config.allDecodersLock.Lock()
LogInfo.Println("Waiting for decoders shutdown")
// 退出所有的decoders解码器
for _, decoder := range config.allDecoders {
close(decoder.InChan())
LogInfo.Printf("Stop message sent to decoder '%s'", decoder.Name())
}
// 移除所有的解码器
config.allDecoders = config.allDecoders[:0]
config.allDecodersLock.Unlock()
config.decodersWg.Wait()
LogInfo.Println("Decoders shutdown complete")
config.filtersLock.Lock()
// 退出所有的Filter过滤器
for _, filter := range config.FilterRunners {
// needed for a clean shutdown without deadlocking or orphaning messages
// 1. removes the matcher from the router
// 2. closes the matcher input channel and lets it drain
// 3. closes the filter input channel and lets it drain
// 4. exits the filter
config.router.RemoveFilterMatcher() <- filter.MatchRunner()
LogInfo.Printf("Stop message sent to filter '%s'", filter.Name())
}
config.filtersLock.Unlock()
config.filtersWg.Wait()
// 退出所有的output运行器,调用MatchRunner停止并返回 移除相关的Output Matcher
for _, output := range config.OutputRunners {
config.router.RemoveOutputMatcher() <- output.MatchRunner()
LogInfo.Printf("Stop message sent to output '%s'", output.Name())
}
outputsWg.Wait()
// 停止所有的序列化Encoder
for name, encoder := range config.allEncoders {
if stopper, ok := encoder.(NeedsStopping); ok {
LogInfo.Printf("Stopping encoder '%s'", name)
stopper.Stop()
}
}
LogInfo.Println("Shutdown complete.")
return globals.exitCode
}
type iRunner struct 实现了InputRunner接口,type foRunner struct同时实现了OutputRunner和FilterRunner接口,主要代码在pipeline/plugin_runners.go文件中。此节写到这里,后续编写具体Input,Filter,Output插件的加载。