当前位置: 首页 > 工具软件 > Logrus > 使用案例 >

golang logrus自定义hook:日志切片hook、邮件警报hook、kafkahook。

陆海阳
2023-12-01

logrus Hook 分析

  • logrus hook 接口定义很简单。如下
package logrus

// A hook to be fired when logging on the logging levels returned from
// `Levels()` on your implementation of the interface. Note that this is not
// fired in a goroutine or a channel with workers, you should handle such
// functionality yourself if your call is non-blocking and you don't wish for
// the logging calls for levels returned from `Levels()` to block.
type Hook interface {
	Levels() []Level
	Fire(*Entry) error
}

// Internal type for storing the hooks on a logger instance.
type LevelHooks map[Level][]Hook

// Add a hook to an instance of logger. This is called with
// `log.Hooks.Add(new(MyHook))` where `MyHook` implements the `Hook` interface.
func (hooks LevelHooks) Add(hook Hook) {
	for _, level := range hook.Levels() {
		hooks[level] = append(hooks[level], hook)
	}
}

// Fire all the hooks for the passed level. Used by `entry.log` to fire
// appropriate hooks for a log entry.
func (hooks LevelHooks) Fire(level Level, entry *Entry) error {
	for _, hook := range hooks[level] {
		if err := hook.Fire(entry); err != nil {
			return err
		}
	}

	return nil
}

只需实现 该结构的接口。

type Hook interface {
	Levels() []Level
	Fire(*Entry) error
}

就会被logrus框架遍历调用已注册的 hook 的 Fire 方法

获取日志实例

// log_hook.go
package logger

import (
	"fmt"
	"github.com/sirupsen/logrus"
	"library/util/constant"
	"os"
)


//自实现 logrus hook
func getLogger(module string) *logrus.Logger {
	//实例化
	logger := logrus.New()
	//设置输出
	logger.Out = os.Stdout
	//设置日志级别
	logger.SetLevel(logrus.DebugLevel)
	//设置日志格式
	//自定writer就行, hook 交给 lfshook
	logger.AddHook(newLogrusHook(constant.GetLogPath(), module))
	
	logger.SetFormatter(&logrus.JSONFormatter{
		TimestampFormat:"2006-01-02 15:04:05",
	})
	return logger
}

//确保每次调用使用的文件都是唯一的。
func GetNewFieldLoggerContext(module,appField string) *logrus.Entry {
	logger:= getLogger(module)
	return logger.WithFields(logrus.Fields{
		"app": appField,
	})
}

//订阅 警告日志
func SubscribeLog(entry *logrus.Entry, subMap SubscribeMap) {
	logger := entry.Logger
	logger.AddHook(newSubScribeHook(subMap))
	fmt.Println("日志订阅成功")
}

constant.GetLogPath() 可以替换为自己的日志文件输出目录地址,比如我的mac上则是:/usr/local/log ,直接替换即可。

日志切片hook

  • 代码
// writer.go
package logger

import (
	"fmt"
	"github.com/pkg/errors"
	"io"
	"library/util"
	"os"
	"path/filepath"
	"sync"
	"time"
)

type LogWriter struct {
	logDir              string  //日志根目录地址。
	module              string  //模块 名
  	curFileName    		string  //当前被指定的filename
	curBaseFileName     string  //在使用中的file
	turnCateDuration    time.Duration
	mutex         		sync.RWMutex
	outFh         		*os.File
}

func (w  *LogWriter) Write(p []byte) (n int, err error) {
	w.mutex.Lock()
	defer w.mutex.Unlock()
	if out, err:= w.getWriter(); err!=nil {
		return 0, errors.New("failed to fetch target io.Writer")
	}else{
		return out.Write(p)
	}
}

func (w *LogWriter) getFileName() string {
	base := time.Now().Truncate(w.turnCateDuration)
	return fmt.Sprintf("%s/%s/%s_%s", w.logDir, base.Format("2006-01-02"), w.module, base.Format("15"))
}

func (w *LogWriter) getWriter()(io.Writer, error) {
	fileName := w.curBaseFileName
	//判断是否有新的文件名
	//会出现新的文件名
	baseFileName := w.getFileName()
	if baseFileName != fileName {
		fileName = baseFileName
	}

	dirname := filepath.Dir(fileName)
	if err := os.MkdirAll(dirname, 0755); err != nil {
		return nil, errors.Wrapf(err, "failed to create directory %s", dirname)
	}

	fileHandler, err := os.OpenFile(fileName, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
	if err != nil {
		return nil, errors.Errorf("failed to open file %s", err)
	}
	w.outFh.Close()
	w.outFh = fileHandler
	w.curBaseFileName = fileName
	w.curFileName = fileName

	return fileHandler, nil
}

func New(logPath, module string, duration time.Duration) *LogWriter {
	return &LogWriter{
		logDir: logPath,
		module: module,
		turnCateDuration:duration,
		curFileName: "",
		curBaseFileName: "",
	}
}
// hook.go
package logger

import (
	"github.com/rifflock/lfshook"
	"github.com/sirupsen/logrus"
	"time"
)
func newLogrusHook(logPath, moduel string) logrus.Hook {
	logrus.SetLevel(logrus.WarnLevel)

	writer := New(logPath, moduel, time.Hour * 2)

	lfsHook := lfshook.NewHook(lfshook.WriterMap{
		logrus.DebugLevel: writer,
		logrus.InfoLevel:  writer,
		logrus.WarnLevel:  writer,
		logrus.ErrorLevel: writer,
		logrus.FatalLevel: writer,
		logrus.PanicLevel: writer,
	}, &logrus.TextFormatter{DisableColors: true})

	// writer 生成新的log文件类型 writer  在通过new hook函数 消费 fire 函数
	// writer 是实现了writer 接口的库,在日志调用write是做预处理
	return lfsHook
}
  • 测试代码
func TestGetLogger(t *testing.T) {
	lg := GetNewFieldLoggerContext("test","d")
	lg.Logger.Info("????")
}

解析

logger实例持有了 自定义的 io.writer 结构体,在消费Fire函数时,会调用Write方法,此时通过Truncate时间切片函数逻辑判断需要写入的文件。或创建新的文件。
注: 文章提供的代码是按天切分文件夹的,文件夹内模块日志再按2小时切分。可自行替换成按模块切分。

邮件警报hook

  • 代码
// subscribeHook.go
package logger

import (
	"fmt"
	"github.com/sirupsen/logrus"
	"library/email"
	"strings"
)

type SubscribeMap  map[logrus.Level][]*email.Receiver
type SubscribeHook struct {
	subMap SubscribeMap
}
//此处可以自实现hook 目前使用三方hook
func(h *SubscribeHook)Levels() []logrus.Level{
	return logrus.AllLevels
}

func(h *SubscribeHook)Fire(entry *logrus.Entry) error{
	for level, receivers := range  h.subMap {
		//命中 准备消费
		if level == entry.Level {
			if len(receivers) > 0 {
				email.SendEmail(receivers, fmt.Sprintf("%s:[系统日志警报]", entry.Level.String()),
					fmt.Sprintf("错误内容: %s",entry.Message))
			}
		}
	}
	return nil
}
func NewSubscribeMap(level logrus.Level, receiverStr string) SubscribeMap{
	subMap := SubscribeMap{}
	addressList := strings.Split(receiverStr,";")
	var receivers []*email.Receiver
	for _, address := range addressList {
		receivers = append(receivers,  &email.Receiver{Email: address})
	}
	subMap[level] = receivers
	return  subMap
}
func newSubScribeHook(subMap SubscribeMap) *SubscribeHook {
	return &SubscribeHook{subMap}
}
// email.go
package email

import (
	"fmt"
	"gopkg.in/gomail.v2"
	"regexp"
	"strconv"
)

type Sender struct {
	User      string
	Password  string
	Host      string
	Port      int
	MailTo    []string
	Subject   string
	Content   string
}

type Receiver struct {
	Email    string
}

func (r *Receiver) Check() bool {
	pattern := `\w+([-+.]\w+)*@\w+([-.]\w+)*\.\w+([-.]\w+)*` //匹配电子邮箱
	reg := regexp.MustCompile(pattern)
	return reg.MatchString(r.Email)
}

func (s *Sender) clean (){

}

//检查 邮箱正确性
func (s *Sender)NewReceiver(email string) *Receiver {
	rec := &Receiver{Email:email}
	if rec.Check() {
		m.MailTo = []string{email}
		return rec
	}else{
		fmt.Printf("email check fail 【%s】\n", email)
		return nil
	}
}
func (s *Sender)NewReceivers(receivers []*Receiver)  {
	for _, rec := range receivers {
		if rec.Check() {
			m.MailTo = append(m.MailTo, rec.Email)
		}else{
			fmt.Printf("email check fail 【%s】\n", rec.Email)
		}
	}
}
// 163邮箱 password 为开启smtp后给的秘钥
var m  = Sender{User:"6666666@163.com", Password:"666666666", Host: "smtp.163.com", Port: 465}

func SendEmail(receivers []*Receiver,subject, content string){
	m.NewReceivers(receivers)
	m.Subject = subject
	m.Content = content

	e := gomail.NewMessage()
	e.SetHeader("From",  e.FormatAddress(m.User, "hengsheng"))
	e.SetHeader("To", m.MailTo...)    //发送给多个用户
	e.SetHeader("Subject", m.Subject) //设置邮件主题
	e.SetBody("text/html", m.Content)    //设置邮件正文
	d := gomail.NewDialer(m.Host, m.Port, m.User, m.Password)
	err := d.DialAndSend(e)
	if err != nil {
		fmt.Printf("error 邮件发送错误! %s  \n", err.Error())
	}
}
使用

同理在writer时 如果是错误日志则发送邮件。

o.logger = logger.GetNewFieldLoggerContext("test", "666")
if subscribeSocket  {
		logger.SubscribeLog(o.Logger, logger.NewSubscribeMap(logrus.ErrorLevel, "a@163.com;b@163.com"))
	}
	// o 为实际结构体实例

kafkahook

// kafka hook
package logger

import (
	"github.com/sirupsen/logrus"
	"library/kafka"
	"library/util/constant"
)

type KafKaHook struct {
	kafkaProducer   *kafka.KafkaProducer
}


func(h *KafKaHook)Levels() []logrus.Level{
	return logrus.AllLevels
}

func(h *KafKaHook)Fire(entry *logrus.Entry) error{
	h.kafkaProducer.SendMsgSync(entry.Message)
	return nil
}

func newKafkaHook() *KafKaHook{
	producer := kafka.NewKafkaProducer(constant.KafkaLogElkTopic,true)
	return &KafKaHook{kafkaProducer: producer}
}

使用时logger.AddHook(newKafkaHook()) 即可

kafka模块

  • 生产者
// kafkaProducer.go
package kafka

import (
	"errors"
	"fmt"
	"github.com/Shopify/sarama"
	"library/util/constant"
	"log"
	"time"
)

func GetKafkaAddress()[]string{
	return "127.0.0.1:9092"
}


//同步消息模式
func SyncProducer(topic, message string) error {
	config := sarama.NewConfig()
	config.Producer.Return.Successes = true
	config.Producer.Timeout = 5 * time.Second
	p, err := sarama.NewSyncProducer(GetKafkaAddress(), config)
	if err != nil {
		return errors.New(fmt.Sprintf("sarama.NewSyncProducer err, message=%s \n", err))
	}
	defer p.Close()
	msg := &sarama.ProducerMessage{
		Topic: topic,
		Value: sarama.ByteEncoder(message),
	}
	part, offset, err := p.SendMessage(msg)
	if err != nil {
		return errors.New(fmt.Sprintf("send sdsds err=%s \n", err))
	} else {
		fmt.Printf("发送成功,partition=%d, offset=%d \n", part, offset)
		return nil
	}
}


//async 异步生产者
type KafkaProducer struct {
	topic     		string
	asyncProducer  	*sarama.AsyncProducer
	syncProducer    *sarama.SyncProducer
	sync      		bool
}

func NewKafkaProducer(topic string, sync bool) *KafkaProducer  {
	k := &KafkaProducer{
		topic:     topic,
		sync:      sync,
	}
	if sync {
		k.initSync()
	}else{
		k.initAsync()
	}
	return k
}

func (k *KafkaProducer) initAsync() bool {
	if k.sync {
		fmt.Printf("sync producer cant call async func !\n")
		return false
	}
	config := sarama.NewConfig()
	//等待服务器所有副本都保存成功后的响应
	config.Producer.RequiredAcks = sarama.WaitForAll
	//随机向partition发送消息
	config.Producer.Partitioner = sarama.NewRandomPartitioner
	//是否等待成功和失败后的响应,只有上面的RequireAcks设置不是NoReponse这里才有用.
	config.Producer.Return.Successes = true
	config.Producer.Return.Errors = true
	//设置使用的kafka版本,如果低于V0_10_0_0版本,消息中的timestrap没有作用.需要消费和生产同时配置
	//注意,版本设置不对的话,kafka会返回很奇怪的错误,并且无法成功发送消息
	config.Version = sarama.V0_10_0_1

	producer, e := sarama.NewAsyncProducer(GetKafkaAddress(), config)
	if e != nil {
		fmt.Println(e)
		return false
	}
	k.asyncProducer = &producer
	defer producer.AsyncClose()
	pd := *k.asyncProducer
	go func() {
		for{
			select {
			case <-pd.Successes():
				//fmt.Println("offset: ", suc.Offset, "timestamp: ", suc.Timestamp.String(), "partitions: ", suc.Partition)
			case fail := <-pd.Errors():
				fmt.Printf("err: %s  \n", fail.Err.Error())
			}
		}
	}()

	return true
}

func (k *KafkaProducer) initSync() bool {
	if !k.sync {
		fmt.Println("async producer cant call sync func !")
		return false
	}

	config := sarama.NewConfig()
	config.Producer.Return.Successes = true
	config.Producer.Timeout = 5 * time.Second
	p, err := sarama.NewSyncProducer(GetKafkaAddress(), config)
	k.syncProducer = &p
	if err != nil {
		log.Printf("sarama.NewSyncProducer err, message=%s \n", err)
		return false
	}
	return true
}

func (k *KafkaProducer) SendMsgAsync(sendStr string)  {

	msg := &sarama.ProducerMessage{
		Topic: k.topic,
	}

	//将字符串转化为字节数组
	msg.Value = sarama.ByteEncoder(sendStr)
	//fmt.Println(value)

	//使用通道发送
	pd := *k.asyncProducer
	pd.Input() <- msg
}

func (k *KafkaProducer) SendMsgSync(sendStr string) bool {
	msg := &sarama.ProducerMessage{
		Topic: k.topic,
		Value: sarama.ByteEncoder(sendStr),
	}
	pd := *k.syncProducer
	part, offset, err := pd.SendMessage(msg)
	if err != nil {
		fmt.Printf("发送失败 send  message(%s) err=%s \n", sendStr, err)
		return false
	} else {
		fmt.Printf("发送成功 partition=%d, offset=%d \n", part, offset)
		return true
	}
}

调用 SendMsgSync 或 SendMsgAsync 生产消息,注意初始化时的参数要保证一致!

  • 消费者组
// kafkaConsumerGroup.go

package kafka

import (
	"context"
	"fmt"
	"github.com/Shopify/sarama"
	"log"
	"sync"
)

func NewKafkaConsumerGroup(topics []string, group string,  businessCall func(message *sarama.ConsumerMessage) bool) *KafkaConsumerGroup {
	k := &KafkaConsumerGroup{
		brokers: 			GetKafkaAddress(),
		topics: 			topics,
		group:             	group,
		channelBufferSize: 	2,
		ready:             	make(chan bool),
		version:			"1.1.1",
		handler:			businessCall,
	}
	k.Init()
	return k
}

// 消费者组(consumer group): 相同的group.id的消费者将视为同一个消费者组,
// 每个消费者都需要设置一个组id, 每条消息只能被 consumer group 中的一个
// Consumer 消费,但可以被多个 consumer group 消费
type KafkaConsumerGroup struct {
	//代理(broker): 一台kafka服务器称之为一个broker
	brokers   			[]string
	//主题(topic): 消息的一种逻辑分组,用于对消息分门别类,每一类消息称之为一个主题,相同主题的消息放在一个队列中
	topics    			[]string
	version   			string
	ready             	chan bool
	group             	string
	channelBufferSize  	int
	//业务调用
	handler         	func(message *sarama.ConsumerMessage) bool
}

func (k *KafkaConsumerGroup)Init() func()  {

	 version,err := sarama.ParseKafkaVersion(k.version)
	 if err!=nil{
		fmt.Printf("Error parsing Kafka version: %v", err)
	}
		cfg := sarama.NewConfig()
		cfg.Version = version
		// 分区分配策略
		cfg.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
		// 未找到组消费位移的时候从哪边开始消费
		cfg.Consumer.Offsets.Initial = -2
		// channel长度
		cfg.ChannelBufferSize = k.channelBufferSize
		ctx, cancel := context.WithCancel(context.Background())
		client, err := sarama.NewConsumerGroup(k.brokers, k.group, cfg)
		if err != nil {
			fmt.Printf("Error creating consumer group client: %v", err)
		}

		wg := &sync.WaitGroup{}
		wg.Add(1)
		go func() {
			defer func() {
				wg.Done()
				//util.HandlePanic("client.Consume panic", log.StandardLogger())
			}()
			for {
				if err := client.Consume(ctx, k.topics, k); err != nil {
					log.Printf("Error from consumer: %v", err)
				}
				// check if context was cancelled, signaling that the consumer should stop
				if ctx.Err() != nil {
					log.Println(ctx.Err())
					return
				}
				k.ready = make(chan bool)
			}
		}()

		<-k.ready
	fmt.Printf("Sarama consumer up and running!... \n")
		// 保证在系统退出时,通道里面的消息被消费
		return func() {
			cancel()
			wg.Wait()
			if err = client.Close(); err != nil {
				fmt.Printf("Error closing client: %v  \n", err)
			}
		}

}



// Setup is run at the beginning of a new session, before ConsumeClaim
func (k *KafkaConsumerGroup) Setup(sarama.ConsumerGroupSession) error {
	// Mark the consumer as ready
	close(k.ready)
	return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (k *KafkaConsumerGroup) Cleanup(sarama.ConsumerGroupSession) error {
	return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (k *KafkaConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {

	// NOTE:
	// Do not move the code below to a goroutine.
	// The `ConsumeClaim` itself is called within a goroutine, see:
	// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
	// 具体消费消息
	for message := range claim.Messages() {
		//msg := string(message.Value)
		//k.logger.Infof("卡夫卡: %s", msg)

		if ok:= k.handler(message); ok {
			// 更新位移
			session.MarkMessage(message, "")
		}
		//run.Run(msg)
	}
	return nil
}

测试代码

func TestKafkaConsumerGroup_Init(t *testing.T) {
	//pd := NewKafkaProducer("test-fail",true)
	//pd.InitSync()
	k := NewKafkaConsumerGroup([]string{constant.KafkaALiSdkTopic}, "group-2", func(message *sarama.ConsumerMessage) bool {
		fmt.Println(string(message.Value))
		//如果失败的处理逻辑
		//if ok := pd.SendMsgSync("666666"); ok {
		//	return true
		//}
		return false

	})
	consumerDone := k.Init()

	sigterm := make(chan os.Signal, 1)
	signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
	select {
	case <-sigterm:
		fmt.Println("terminating: via signal")
	}
	consumerDone()
}

这里有一些补偿逻辑在里面。

以上就是logrus相关hook。

 类似资料: